diff --git a/app/api_report.go b/app/api_report.go index f4bfa329c9..1821d71f6a 100644 --- a/app/api_report.go +++ b/app/api_report.go @@ -2,10 +2,12 @@ package main import ( "net/http" + + "github.com/weaveworks/scope/xfer" ) // Raw report handler -func makeRawReportHandler(rep Reporter) func(http.ResponseWriter, *http.Request) { +func makeRawReportHandler(rep xfer.Reporter) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { // r.ParseForm() respondWith(w, http.StatusOK, rep.Report()) diff --git a/app/api_topologies.go b/app/api_topologies.go index 6c31fb795d..5f82aa674e 100644 --- a/app/api_topologies.go +++ b/app/api_topologies.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/weaveworks/scope/render" + "github.com/weaveworks/scope/xfer" ) // APITopologyDesc is returned in a list by the /api/topology handler. @@ -21,7 +22,7 @@ type topologyStats struct { } // makeTopologyList returns a handler that yields an APITopologyList. -func makeTopologyList(rep Reporter) func(w http.ResponseWriter, r *http.Request) { +func makeTopologyList(rep xfer.Reporter) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var ( rpt = rep.Report() diff --git a/app/api_topology.go b/app/api_topology.go index 9953414b20..204ddaa65c 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -9,6 +9,7 @@ import ( "github.com/weaveworks/scope/render" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" ) const ( @@ -32,14 +33,14 @@ type APIEdge struct { } // Full topology. -func handleTopology(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { +func handleTopology(rep xfer.Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { respondWith(w, http.StatusOK, APITopology{ Nodes: t.renderer.Render(rep.Report()), }) } // Websocket for the full topology. This route overlaps with the next. -func handleWs(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { +func handleWs(rep xfer.Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { respondWith(w, http.StatusInternalServerError, err.Error()) return @@ -56,7 +57,7 @@ func handleWs(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Reque } // Individual nodes. -func handleNode(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { +func handleNode(rep xfer.Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) nodeID = vars["id"] @@ -71,7 +72,7 @@ func handleNode(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Req } // Individual edges. -func handleEdge(rep Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { +func handleEdge(rep xfer.Reporter, t topologyView, w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) localID = vars["local"] @@ -90,7 +91,7 @@ var upgrader = websocket.Upgrader{ func handleWebsocket( w http.ResponseWriter, r *http.Request, - rep Reporter, + rep xfer.Reporter, t topologyView, loop time.Duration, ) { diff --git a/app/main.go b/app/main.go index 1f06ae5a6c..01f569cd77 100644 --- a/app/main.go +++ b/app/main.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "log" "math/rand" "net/http" @@ -21,35 +20,17 @@ var version = "dev" func main() { var ( - defaultProbes = []string{fmt.Sprintf("localhost:%d", xfer.ProbePort), fmt.Sprintf("scope.weave.local:%d", xfer.ProbePort)} - batch = flag.Duration("batch", 1*time.Second, "batch interval") - window = flag.Duration("window", 15*time.Second, "window") - listen = flag.String("http.address", ":"+strconv.Itoa(xfer.AppPort), "webserver listen address") - printVersion = flag.Bool("version", false, "print version number and exit") + window = flag.Duration("window", 15*time.Second, "window") + listen = flag.String("http.address", ":"+strconv.Itoa(xfer.AppPort), "webserver listen address") ) flag.Parse() - probes := append(defaultProbes, flag.Args()...) - - if *printVersion { - fmt.Println(version) - return - } rand.Seed(time.Now().UnixNano()) id := strconv.FormatInt(rand.Int63(), 16) - log.Printf("app starting, version %s, id %s", version, id) - - // Collector deals with the probes, and generates merged reports. - c := xfer.NewCollector(*batch, id) - defer c.Stop() - - r := newStaticResolver(probes, c.Add) - defer r.Stop() - - lifo := NewReportLIFO(c, *window) - defer lifo.Stop() + log.Printf("app starting, version %s, ID %s", version, id) - http.Handle("/", Router(lifo)) + c := xfer.NewCollector(*window) + http.Handle("/", Router(c)) irq := interrupt() go func() { log.Printf("listening on %s", *listen) diff --git a/app/mock_reporter_test.go b/app/mock_reporter_test.go index 20c03dcad7..711c9b2094 100644 --- a/app/mock_reporter_test.go +++ b/app/mock_reporter_test.go @@ -5,9 +5,9 @@ import ( "github.com/weaveworks/scope/test" ) -// StaticReport is used as know test data in api tests. +// StaticReport is used as a fixture in tests. It emulates an xfer.Collector. type StaticReport struct{} -func (s StaticReport) Report() report.Report { - return test.Report -} +func (s StaticReport) Report() report.Report { return test.Report } + +func (s StaticReport) Add(report.Report) {} diff --git a/app/origin_host.go b/app/origin_host.go index 279b9aaaf9..1dab746114 100644 --- a/app/origin_host.go +++ b/app/origin_host.go @@ -8,6 +8,7 @@ import ( "github.com/weaveworks/scope/probe/host" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" ) // OriginHost represents a host that runs a probe, i.e. the origin host of @@ -35,7 +36,7 @@ func getOriginHost(t report.Topology, nodeID string) (OriginHost, bool) { } // makeOriginHostHandler makes the /api/origin/* handler. -func makeOriginHostHandler(rep Reporter) http.HandlerFunc { +func makeOriginHostHandler(rep xfer.Reporter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var ( vars = mux.Vars(r) diff --git a/app/report_lifo.go b/app/report_lifo.go deleted file mode 100644 index 31824ef118..0000000000 --- a/app/report_lifo.go +++ /dev/null @@ -1,96 +0,0 @@ -package main - -import ( - "time" - - "github.com/weaveworks/scope/report" -) - -// Reporter is something which generates a single 'current' report over a -// stream of incoming reports. -type Reporter interface { - Report() report.Report -} - -type timedReport struct { - report.Report - Timestamp time.Time -} - -// ReportLIFO keeps a short-term history of reports. -type ReportLIFO struct { - reports []timedReport - requests chan chan report.Report - quit chan chan struct{} -} - -type reporter interface { - Reports() <-chan report.Report -} - -// NewReportLIFO collects reports up to a certain age. -func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO { - l := ReportLIFO{ - reports: []timedReport{}, - requests: make(chan chan report.Report), - quit: make(chan chan struct{}), - } - - go func() { - for { - select { - case report := <-r.Reports(): - // Incoming report from the collecter. - tr := timedReport{ - Timestamp: time.Now(), - Report: report, - } - l.reports = append(l.reports, tr) - l.reports = cleanOld(l.reports, time.Now().Add(-maxAge)) - - case req := <-l.requests: - // Request for the current report. - report := report.MakeReport() - oldest := time.Now() - for _, r := range l.reports { - if r.Timestamp.Before(oldest) { - oldest = r.Timestamp - } - report.Merge(r.Report) - } - report.Window = time.Now().Sub(oldest) - req <- report - - case q := <-l.quit: - close(q) - return - } - } - }() - return &l -} - -// Stop shuts down the monitor. -func (r *ReportLIFO) Stop() { - q := make(chan struct{}) - r.quit <- q - <-q -} - -// Report returns the latest report. -func (r *ReportLIFO) Report() report.Report { - req := make(chan report.Report) - r.requests <- req - return <-req -} - -func cleanOld(reports []timedReport, threshold time.Time) []timedReport { - res := make([]timedReport, 0, len(reports)) - for _, tr := range reports { - if tr.Timestamp.Before(threshold) { - continue - } - res = append(res, tr) - } - return res -} diff --git a/app/router.go b/app/router.go index abfa52cfe9..548fe8994b 100644 --- a/app/router.go +++ b/app/router.go @@ -1,6 +1,7 @@ package main import ( + "encoding/gob" "net/http" "net/url" "strings" @@ -8,6 +9,8 @@ import ( "github.com/gorilla/mux" "github.com/weaveworks/scope/render" + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" ) // URLMatcher uses request.RequestURI (the raw, unparsed request) to attempt @@ -41,10 +44,17 @@ func URLMatcher(pattern string) mux.MatcherFunc { } } -// Router gives of the HTTP dispatcher. It will always use the embedded HTML -// resources. -func Router(c Reporter) *mux.Router { +type collector interface { + xfer.Reporter + xfer.Adder +} + +// Router returns the HTTP dispatcher, managing API and UI requests, and +// accepting reports from probes.. It will always use the embedded HTML +// resources for the UI. +func Router(c collector) *mux.Router { router := mux.NewRouter() + router.HandleFunc("/api/report", makeReportPostHandler(c)).Methods("POST") get := router.Methods("GET").Subrouter() get.HandleFunc("/api", apiHandler) get.HandleFunc("/api/topology", makeTopologyList(c)) @@ -58,7 +68,19 @@ func Router(c Reporter) *mux.Router { return router } -func captureTopology(rep Reporter, f func(Reporter, topologyView, http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) { +func makeReportPostHandler(a xfer.Adder) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var rpt report.Report + if err := gob.NewDecoder(r.Body).Decode(&rpt); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.Add(rpt) + w.WriteHeader(http.StatusOK) + } +} + +func captureTopology(rep xfer.Reporter, f func(xfer.Reporter, topologyView, http.ResponseWriter, *http.Request)) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { topology, ok := topologyRegistry[mux.Vars(r)["topology"]] if !ok { diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index fcc4d477fc..49efb8cc07 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -54,10 +54,10 @@ if [ -n "$DNS_SERVER" -a -n "$SEARCHPATH" ]; then fi # End of the command line can optionally be some -# addresses of probes to connect to, for people not -# using Weave DNS. We stick these in /etc/weave/probes -# for the run-app script to pick up. -MANUAL_PROBES=$@ -echo "$MANUAL_PROBES" >/etc/weave/probes +# addresses of apps to connect to, for people not +# using Weave DNS. We stick these in /etc/weave/apps +# for the run-probe script to pick up. +MANUAL_APPS=$@ +echo "$MANUAL_APPS" >/etc/weave/apps exec /sbin/runsvdir /etc/service diff --git a/docker/run-app b/docker/run-app index 05545c852f..b5542a8624 100755 --- a/docker/run-app +++ b/docker/run-app @@ -1,3 +1,3 @@ #!/bin/sh -exec /home/weave/scope-app $(cat /etc/weave/scope-app.args) $(cat /etc/weave/probes) +exec /home/weave/scope-app $(cat /etc/weave/scope-app.args) diff --git a/docker/run-probe b/docker/run-probe index cf07a47da0..517a3f5189 100755 --- a/docker/run-probe +++ b/docker/run-probe @@ -1,3 +1,3 @@ #!/bin/sh -exec /home/weave/scope-probe $(cat /etc/weave/scope-probe.args) +exec /home/weave/scope-probe $(cat /etc/weave/scope-probe.args) $(cat /etc/weave/apps) diff --git a/experimental/bridge/Makefile b/experimental/bridge/Makefile deleted file mode 100644 index 70334ba496..0000000000 --- a/experimental/bridge/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -.PHONY: all vet lint build test clean - -all: build test vet lint - -vet: - go vet ./... - -lint: - golint . - -build: - go build - -test: - go test - -clean: - go clean - diff --git a/experimental/bridge/main.go b/experimental/bridge/main.go deleted file mode 100644 index e2e98810e4..0000000000 --- a/experimental/bridge/main.go +++ /dev/null @@ -1,207 +0,0 @@ -package main - -import ( - "flag" - "log" - "net" - _ "net/http/pprof" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "github.com/weaveworks/scope/render" - "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/xfer" -) - -const ( - trafficTimeout = 2 * time.Minute -) - -func main() { - var ( - listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address") - probes = flag.String("probes", "", "list of all initial probes, comma separated") - batch = flag.Duration("batch", 1*time.Second, "batch interval") - version = flag.Bool("version", false, "print version number and exit") - ) - flag.Parse() - - if len(flag.Args()) != 0 { - flag.Usage() - os.Exit(1) - } - - if *version { - //fmt.Printf("%s\n", probe.Version) - return - } - - if *probes == "" { - log.Fatal("no probes given via -probes") - } - - log.Printf("starting") - - fixedAddresses := strings.Split(*probes, ",") - - // Collector deals with the probes, and generates a single merged report - // every second. - c := xfer.NewCollector(*batch, "id") - for _, addr := range fixedAddresses { - c.Add(addr) - } - defer c.Stop() - - publisher, err := xfer.NewTCPPublisher(*listen) - if err != nil { - log.Fatal(err) - } - defer publisher.Close() - log.Printf("listening on %s\n", *listen) - - var fixedIPs []string - for _, a := range fixedAddresses { - if addr, _, err := net.SplitHostPort(a); err == nil { - fixedIPs = append(fixedIPs, addr) - } - } - go discover(c, publisher, fixedIPs) - - <-interrupt() - - log.Printf("shutting down") -} - -func interrupt() chan os.Signal { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return c -} - -type collector interface { - Reports() <-chan report.Report - Remove(string) - Add(string) -} - -type publisher xfer.Publisher - -// makeAvoid makes a map with IPs we don't want to consider in discover(). It -// is the set of IPs which the bridge is configured to connect to, and the all -// the IPs from the local interfaces. -func makeAvoid(fixed []string) map[string]struct{} { - avoid := map[string]struct{}{} - - // Don't discover fixed probes. This way we'll never remove them. - for _, a := range fixed { - avoid[a] = struct{}{} - } - - // Don't go Ouroboros. - if localNets, err := net.InterfaceAddrs(); err == nil { - for _, n := range localNets { - if net, ok := n.(*net.IPNet); ok { - avoid[net.IP.String()] = struct{}{} - } - } - } - - return avoid -} - -// discover reads reports from a collector and republishes them on the -// publisher, while scanning the reports for IPs to connect to. Only addresses -// in the network topology of the report are considered. IPs listed in fixed -// are always skipped. -func discover(c collector, p publisher, fixed []string) { - lastSeen := map[string]time.Time{} - - avoid := makeAvoid(fixed) - - for r := range c.Reports() { - // log.Printf("got a report") - p.Publish(r) - - var ( - now = time.Now() - localNets = render.LocalNetworks(r) - ) - - for _, adjacent := range r.Address.Adjacency { - for _, a := range adjacent { - ip := report.AddressIDAddresser(a) // address id -> IP - if ip == nil { - continue - } - - addr := ip.String() - if _, ok := avoid[addr]; ok { - continue - } - // log.Printf("potential address: %v (via %s)", addr, src) - if _, ok := lastSeen[addr]; !ok { - if interestingAddress(localNets, addr) { - log.Printf("discovery %v: potential probe address", addr) - c.Add(addressToDial(addr)) - } else { - log.Printf("discovery %v: non-probe address", addr) - } - } - - // We always add addr to lastSeen[], even if it's a non-local - // address. This way they are part of the normal timeout logic, - // and we won't analyze the address again until it re-appears - // after a timeout. - lastSeen[addr] = now - } - } - - for addr, last := range lastSeen { - if now.Sub(last) > trafficTimeout { - // Timeout can be for a non-local address, which we didn't - // connect to. In that case the RemoveAddress() call won't do - // anything. - log.Printf("discovery %v: traffic timeout", addr) - delete(lastSeen, addr) - c.Remove(addressToDial(addr)) - } - } - } -} - -// interestingAddress tells whether the address is a local and normal address, -// which we want to try to connect to. -func interestingAddress(localNets []*net.IPNet, addr string) bool { - if addr == "" { - return false - } - - // The address is expected to be an IPv{4,6} address. - ip := net.ParseIP(addr) - if ip == nil { - return false - } - - // Filter out localhost, broadcast, and other non-connectable addresses. - if !validateRemoteAddr(ip) { - return false - } - - // Only connect to addresses we know are localnet. - for _, n := range localNets { - if n.Contains(ip) { - return true - } - } - return false -} - -// addressToDial formats an IP address so we can pass it on to Dial(). -func addressToDial(address string) string { - // return fmt.Sprintf("[%s]:%d", addr, probePort) - return net.JoinHostPort(address, strconv.Itoa(xfer.ProbePort)) -} diff --git a/experimental/bridge/net.go b/experimental/bridge/net.go deleted file mode 100644 index a0ed50b23c..0000000000 --- a/experimental/bridge/net.go +++ /dev/null @@ -1,71 +0,0 @@ -package main - -import ( - "net" -) - -func validateRemoteAddr(ip net.IP) bool { - if ip == nil { - return false - } - if ip.IsInterfaceLocalMulticast() { - return false - } - if ip.IsLinkLocalMulticast() { - return false - } - if ip.IsLinkLocalUnicast() { - return false - } - if ip.IsLoopback() { - return false - } - if ip.IsMulticast() { - return false - } - if ip.IsUnspecified() { - return false - } - if isBroadcasty(ip) { - return false - } - - return true -} - -func isBroadcasty(ip net.IP) bool { - if ip4 := ip.To4(); ip4 != nil { - if ip4.Equal(net.IPv4bcast) { - return true - } - if ip4.Equal(net.IPv4allsys) { - return true - } - if ip4.Equal(net.IPv4allrouter) { - return true - } - if ip4.Equal(net.IPv4zero) { - return true - } - return false - } - if ip.Equal(net.IPv6zero) { - return true - } - if ip.Equal(net.IPv6unspecified) { - return true - } - if ip.Equal(net.IPv6loopback) { - return true - } - if ip.Equal(net.IPv6interfacelocalallnodes) { - return true - } - if ip.Equal(net.IPv6linklocalallnodes) { - return true - } - if ip.Equal(net.IPv6linklocalallrouters) { - return true - } - return false -} diff --git a/experimental/bridge/net_test.go b/experimental/bridge/net_test.go deleted file mode 100644 index fcbac70d59..0000000000 --- a/experimental/bridge/net_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package main - -import ( - "net" - "testing" -) - -func TestValidRemoteAddr(t *testing.T) { - for input, expected := range map[string]bool{ - "localhost": false, - - "127.0.0.1": false, // should be same as loopback - "127.1.2.3": false, // 127.0.0.0/8 is all loopback - "0.0.0.0": false, - "224.0.0.1": false, // all systems - "224.0.0.2": false, // all routers - "224.0.0.22": false, - "255.255.255.255": false, // broadcast - "1.2.3.4": true, - - "::": false, // unspecified - "0:0:0:0:0:0:0:0": false, // unspecified (alternate form) - "b8:27:eb:a4:bf:6e": false, - "fe80::1240:f3ff:fe80:5474": false, // loopback - "fe80::1": false, // loopback - "::1": false, // loopback - "0:0:0:0:0:0:0:1": false, // loopback (alternate form) - "2001:db8::1:0:0:1": true, // valid - - // http://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml - "FF01:0:0:0:0:0:0:1": false, // Node-local all-nodes - // "FF01:0:0:0:0:0:0:2": false, // Node-local all-routers, isn't spec'd - // in package net/ip - "FF02:0:0:0:0:0:0:1": false, // Link-local all-nodes - "FF02:0:0:0:0:0:0:2": false, // Link-local all-routers - } { - if got := validateRemoteAddr(net.ParseIP(input)); expected != got { - t.Errorf("%s: expected %v, got %v", input, expected, got) - } - } -} diff --git a/experimental/demoprobe/generate.go b/experimental/demoprobe/generate.go deleted file mode 100644 index c2aa97f0e8..0000000000 --- a/experimental/demoprobe/generate.go +++ /dev/null @@ -1,126 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - "net" - "strconv" - "time" - - "github.com/weaveworks/scope/probe/docker" - "github.com/weaveworks/scope/probe/process" - "github.com/weaveworks/scope/report" -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -// DemoReport makes up a report. -func DemoReport(nodeCount int) report.Report { - r := report.MakeReport() - - // Make up some plausible IPv4 numbers - hosts := []string{} - ip := [4]int{192, 168, 1, 1} - for range make([]struct{}, nodeCount) { - hosts = append(hosts, fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])) - ip[3]++ - if ip[3] > 200 { - ip[2]++ - ip[3] = 1 - } - } - // Some non-local ones. - hosts = append(hosts, []string{"1.2.3.4", "2.3.4.5"}...) - - _, localNet, err := net.ParseCIDR("192.168.0.0/16") - if err != nil { - panic(err) - } - - type conn struct { - srcProc, dstProc string - dstPort int - } - procPool := []conn{ - {srcProc: "curl", dstPort: 80, dstProc: "apache"}, - {srcProc: "wget", dstPort: 80, dstProc: "apache"}, - {srcProc: "curl", dstPort: 80, dstProc: "nginx"}, - {srcProc: "curl", dstPort: 8080, dstProc: "app1"}, - {srcProc: "nginx", dstPort: 8080, dstProc: "app1"}, - {srcProc: "nginx", dstPort: 8080, dstProc: "app2"}, - {srcProc: "nginx", dstPort: 8080, dstProc: "app3"}, - } - connectionCount := nodeCount * 2 - for i := 0; i < connectionCount; i++ { - var ( - c = procPool[rand.Intn(len(procPool))] - src = hosts[rand.Intn(len(hosts))] - dst = hosts[rand.Intn(len(hosts))] - srcPort = rand.Intn(50000) + 10000 - srcPortID = report.MakeEndpointNodeID("", src, strconv.Itoa(srcPort)) - dstPortID = report.MakeEndpointNodeID("", dst, strconv.Itoa(c.dstPort)) - srcID = report.MakeAdjacencyID(srcPortID) - dstID = report.MakeAdjacencyID(dstPortID) - srcAddressID = report.MakeAddressNodeID("", src) - dstAddressID = report.MakeAddressNodeID("", dst) - nodeSrcAddressID = report.MakeAdjacencyID(srcAddressID) - nodeDstAddressID = report.MakeAdjacencyID(dstAddressID) - ) - - // Endpoint topology - if _, ok := r.Endpoint.NodeMetadatas[srcPortID]; !ok { - r.Endpoint.NodeMetadatas[srcPortID] = report.MakeNodeMetadataWith(map[string]string{ - process.PID: "4000", - "name": c.srcProc, - "domain": "node-" + src, - }) - } - r.Endpoint.Adjacency[srcID] = r.Endpoint.Adjacency[srcID].Add(dstPortID) - if _, ok := r.Endpoint.NodeMetadatas[dstPortID]; !ok { - r.Endpoint.NodeMetadatas[dstPortID] = report.MakeNodeMetadataWith(map[string]string{ - process.PID: "4000", - "name": c.dstProc, - "domain": "node-" + dst, - }) - } - r.Endpoint.Adjacency[dstID] = r.Endpoint.Adjacency[dstID].Add(srcPortID) - var ( - edgeKeyEgress = report.MakeEdgeID(srcPortID, dstPortID) - edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) - ) - r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - } - r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ - MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), - } - - // Address topology - if _, ok := r.Address.NodeMetadatas[srcAddressID]; !ok { - r.Address.NodeMetadatas[srcAddressID] = report.MakeNodeMetadataWith(map[string]string{ - docker.Name: src, - }) - } - r.Address.Adjacency[nodeSrcAddressID] = r.Address.Adjacency[nodeSrcAddressID].Add(dstAddressID) - if _, ok := r.Address.NodeMetadatas[dstAddressID]; !ok { - r.Address.NodeMetadatas[dstAddressID] = report.MakeNodeMetadataWith(map[string]string{ - docker.Name: dst, - }) - } - r.Address.Adjacency[nodeDstAddressID] = r.Address.Adjacency[nodeDstAddressID].Add(srcAddressID) - - // Host data - r.Host.NodeMetadatas["hostX"] = report.MakeNodeMetadataWith(map[string]string{ - "ts": time.Now().UTC().Format(time.RFC3339Nano), - "host_name": "host-x", - "local_networks": localNet.String(), - "os": "linux", - }) - } - - return r -} - -func newu64(value uint64) *uint64 { return &value } diff --git a/experimental/demoprobe/main.go b/experimental/demoprobe/main.go index 28548ea936..fbb933cf92 100644 --- a/experimental/demoprobe/main.go +++ b/experimental/demoprobe/main.go @@ -4,53 +4,141 @@ import ( "flag" "fmt" "log" - "os" - "os/signal" + "math/rand" + "net" "strconv" - "syscall" "time" + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/probe/process" + "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" ) func main() { var ( - version = flag.Bool("version", false, "print version number and exit") + publish = flag.String("publish", fmt.Sprintf("localhost:%d", xfer.AppPort), "publish target") publishInterval = flag.Duration("publish.interval", 1*time.Second, "publish (output) interval") - listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address") hostCount = flag.Int("hostcount", 10, "Number of demo hosts to generate") ) flag.Parse() - if len(flag.Args()) != 0 { - flag.Usage() - os.Exit(1) + publisher, err := xfer.NewHTTPPublisher(*publish, "demoprobe") + if err != nil { + log.Fatal(err) } - // -version flag: - if *version { - fmt.Printf("unstable\n") - return + rand.Seed(time.Now().UnixNano()) + for range time.Tick(*publishInterval) { + if err := publisher.Publish(demoReport(*hostCount)); err != nil { + log.Print(err) + } } +} + +func demoReport(nodeCount int) report.Report { + r := report.MakeReport() - publisher, err := xfer.NewTCPPublisher(*listen) + // Make up some plausible IPv4 numbers. + hosts := []string{} + ip := [4]int{192, 168, 1, 1} + for range make([]struct{}, nodeCount) { + hosts = append(hosts, fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])) + ip[3]++ + if ip[3] > 200 { + ip[2]++ + ip[3] = 1 + } + } + hosts = append(hosts, []string{"1.2.3.4", "2.3.4.5"}...) // Some non-local ones, too. + + _, localNet, err := net.ParseCIDR("192.168.0.0/16") if err != nil { - log.Fatal(err) + panic(err) + } + + type conn struct { + srcProc, dstProc string + dstPort int } - defer publisher.Close() - go func() { - for { - publisher.Publish(DemoReport(*hostCount)) - time.Sleep(*publishInterval) + procPool := []conn{ + {srcProc: "curl", dstPort: 80, dstProc: "apache"}, + {srcProc: "wget", dstPort: 80, dstProc: "apache"}, + {srcProc: "curl", dstPort: 80, dstProc: "nginx"}, + {srcProc: "curl", dstPort: 8080, dstProc: "app1"}, + {srcProc: "nginx", dstPort: 8080, dstProc: "app1"}, + {srcProc: "nginx", dstPort: 8080, dstProc: "app2"}, + {srcProc: "nginx", dstPort: 8080, dstProc: "app3"}, + } + connectionCount := nodeCount * 2 + for i := 0; i < connectionCount; i++ { + var ( + c = procPool[rand.Intn(len(procPool))] + src = hosts[rand.Intn(len(hosts))] + dst = hosts[rand.Intn(len(hosts))] + srcPort = rand.Intn(50000) + 10000 + srcPortID = report.MakeEndpointNodeID("", src, strconv.Itoa(srcPort)) + dstPortID = report.MakeEndpointNodeID("", dst, strconv.Itoa(c.dstPort)) + srcID = report.MakeAdjacencyID(srcPortID) + dstID = report.MakeAdjacencyID(dstPortID) + srcAddressID = report.MakeAddressNodeID("", src) + dstAddressID = report.MakeAddressNodeID("", dst) + nodeSrcAddressID = report.MakeAdjacencyID(srcAddressID) + nodeDstAddressID = report.MakeAdjacencyID(dstAddressID) + ) + + // Endpoint topology + if _, ok := r.Endpoint.NodeMetadatas[srcPortID]; !ok { + r.Endpoint.NodeMetadatas[srcPortID] = report.MakeNodeMetadataWith(map[string]string{ + process.PID: "4000", + "name": c.srcProc, + "domain": "node-" + src, + }) + } + r.Endpoint.Adjacency[srcID] = r.Endpoint.Adjacency[srcID].Add(dstPortID) + if _, ok := r.Endpoint.NodeMetadatas[dstPortID]; !ok { + r.Endpoint.NodeMetadatas[dstPortID] = report.MakeNodeMetadataWith(map[string]string{ + process.PID: "4000", + "name": c.dstProc, + "domain": "node-" + dst, + }) + } + r.Endpoint.Adjacency[dstID] = r.Endpoint.Adjacency[dstID].Add(srcPortID) + var ( + edgeKeyEgress = report.MakeEdgeID(srcPortID, dstPortID) + edgeKeyIngress = report.MakeEdgeID(dstPortID, srcPortID) + ) + r.Endpoint.EdgeMetadatas[edgeKeyEgress] = report.EdgeMetadata{ + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), + } + r.Endpoint.EdgeMetadatas[edgeKeyIngress] = report.EdgeMetadata{ + MaxConnCountTCP: newu64(uint64(rand.Intn(100) + 10)), } - }() - log.Printf("%s", <-interrupt()) - log.Printf("Shutting down...") -} + // Address topology + if _, ok := r.Address.NodeMetadatas[srcAddressID]; !ok { + r.Address.NodeMetadatas[srcAddressID] = report.MakeNodeMetadataWith(map[string]string{ + docker.Name: src, + }) + } + r.Address.Adjacency[nodeSrcAddressID] = r.Address.Adjacency[nodeSrcAddressID].Add(dstAddressID) + if _, ok := r.Address.NodeMetadatas[dstAddressID]; !ok { + r.Address.NodeMetadatas[dstAddressID] = report.MakeNodeMetadataWith(map[string]string{ + docker.Name: dst, + }) + } + r.Address.Adjacency[nodeDstAddressID] = r.Address.Adjacency[nodeDstAddressID].Add(srcAddressID) -func interrupt() chan os.Signal { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return c + // Host data + r.Host.NodeMetadatas["hostX"] = report.MakeNodeMetadataWith(map[string]string{ + "ts": time.Now().UTC().Format(time.RFC3339Nano), + "host_name": "host-x", + "local_networks": localNet.String(), + "os": "linux", + }) + } + + return r } + +func newu64(value uint64) *uint64 { return &value } diff --git a/experimental/fixprobe/main.go b/experimental/fixprobe/main.go index 83d2cc5278..6715e4a3c5 100644 --- a/experimental/fixprobe/main.go +++ b/experimental/fixprobe/main.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "os" - "strconv" "time" "github.com/weaveworks/scope/report" @@ -16,35 +15,29 @@ import ( func main() { var ( + publish = flag.String("publish", fmt.Sprintf("localhost:%d", xfer.AppPort), "publish target") publishInterval = flag.Duration("publish.interval", 1*time.Second, "publish (output) interval") - listenAddress = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address") ) flag.Parse() if len(flag.Args()) != 1 { - fmt.Printf("usage: fixprobe [--args] report.json\n") - return + log.Fatal("usage: fixprobe [--args] report.json") } - fixture := flag.Arg(0) - f, err := os.Open(fixture) + f, err := os.Open(flag.Arg(0)) if err != nil { - fmt.Printf("json error: %v\n", err) - return + log.Fatal(err) } var fixedReport report.Report if err := json.NewDecoder(f).Decode(&fixedReport); err != nil { - fmt.Printf("json error: %v\n", err) - return + log.Fatal(err) } + f.Close() - publisher, err := xfer.NewTCPPublisher(*listenAddress) + publisher, err := xfer.NewHTTPPublisher(*publish, "fixprobe") if err != nil { log.Fatal(err) } - defer publisher.Close() - - log.Printf("listening on %s", *listenAddress) for range time.Tick(*publishInterval) { publisher.Publish(fixedReport) diff --git a/experimental/graphviz/handle.go b/experimental/graphviz/handle.go deleted file mode 100644 index 783153212b..0000000000 --- a/experimental/graphviz/handle.go +++ /dev/null @@ -1,115 +0,0 @@ -package main - -import ( - "fmt" - "io" - "net/http" - "os/exec" - "sort" - "strings" - - "github.com/weaveworks/scope/render" - "github.com/weaveworks/scope/report" -) - -func handleTXT(r Reporter) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "text/plain") - - renderer := render.LeafMap{Selector: report.SelectEndpoint, Mapper: mapFunc(req), Pseudo: nil} - dot(w, renderer.Render(r.Report())) - - //report.Render(r.Report(), report.SelectEndpoint, mapFunc(req), report.NoPseudoNode)) - } -} - -func handleSVG(r Reporter) http.HandlerFunc { - return func(w http.ResponseWriter, req *http.Request) { - cmd := exec.Command(engine(req), "-Tsvg") - - wc, err := cmd.StdinPipe() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - cmd.Stdout = w - - renderer := render.LeafMap{Selector: report.SelectEndpoint, Mapper: mapFunc(req), Pseudo: nil} - dot(wc, renderer.Render(r.Report())) - wc.Close() - - w.Header().Set("Content-Type", "image/svg+xml") - if err := cmd.Run(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } -} - -func handleHTML(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/html") - fmt.Fprintf(w, "\n") - fmt.Fprintf(w, ``+"\n") - fmt.Fprintf(w, "\n") - fmt.Fprintf(w, `
`+"\n", r.URL.Query().Encode()) - fmt.Fprintf(w, "\n") -} - -func dot(w io.Writer, m map[string]render.RenderableNode) { - fmt.Fprintf(w, "digraph G {\n") - fmt.Fprintf(w, "\tgraph [ overlap=false ];\n") - fmt.Fprintf(w, "\tnode [ shape=circle, style=filled ];\n") - fmt.Fprintf(w, "\toutputorder=edgesfirst;\n") - fmt.Fprintf(w, "\n") - - // Sorting the nodes seems to stop jumpiness. - nodes := make(sort.StringSlice, 0, len(m)) - for _, node := range m { - nodes = append(nodes, fmt.Sprintf("\t\"%s\" [label=\"%s\n%s\"];\n", node.ID, node.LabelMajor, node.LabelMinor)) - } - sort.Sort(nodes) - for _, s := range nodes { - fmt.Fprint(w, s) - } - fmt.Fprintf(w, "\n") - - // Add ranking information by default. - // Non-dot engines don't seem to be harmed by it. - same := map[string][]string{} - for _, node := range m { - k, v := node.LabelMajor, fmt.Sprintf(`"%s"`, node.ID) - same[k] = append(same[k], v) - } - for _, ids := range same { - fmt.Fprintf(w, "\t{ rank=same; %s }\n", strings.Join(ids, " ")) - } - fmt.Fprintf(w, "\n") - - for _, src := range m { - for _, dstID := range src.Adjacency { - fmt.Fprintf(w, "\t\"%s\" -> \"%s\";\n", src.ID, dstID) - } - } - fmt.Fprintf(w, "}\n") -} - -func engine(r *http.Request) string { - engine := r.FormValue("engine") - if engine == "" { - engine = "dot" - } - return engine -} - -func mapFunc(r *http.Request) render.LeafMapFunc { - switch strings.ToLower(r.FormValue("map_func")) { - case "hosts", "networkhost", "networkhostname": - return render.MapAddressIdentity - } - return render.MapProcessIdentity -} - -func classView(r *http.Request) bool { - return r.FormValue("class_view") == "true" -} diff --git a/experimental/graphviz/main.go b/experimental/graphviz/main.go deleted file mode 100644 index df57ceb2d5..0000000000 --- a/experimental/graphviz/main.go +++ /dev/null @@ -1,55 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "net/http" - "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "github.com/weaveworks/scope/xfer" -) - -func main() { - var ( - defaultProbes = fmt.Sprintf("localhost:%d", xfer.ProbePort) - probes = flag.String("probes", defaultProbes, "list of probe endpoints, comma separated") - batch = flag.Duration("batch", 1*time.Second, "batch interval") - window = flag.Duration("window", 15*time.Second, "window") - listen = flag.String("http.address", ":"+strconv.Itoa(xfer.AppPort), "webserver listen address") - ) - flag.Parse() - - xfer.MaxBackoff = 10 * time.Second - c := xfer.NewCollector(*batch, "id") - for _, addr := range strings.Split(*probes, ",") { - c.Add(addr) - } - defer c.Stop() - lifo := NewReportLIFO(c, *window) - defer lifo.Stop() - - http.Handle("/svg", handleSVG(lifo)) - http.Handle("/txt", handleTXT(lifo)) - http.Handle("/", http.HandlerFunc(handleHTML)) - - irq := interrupt() - go func() { - log.Printf("listening on %s", *listen) - log.Print(http.ListenAndServe(*listen, nil)) - irq <- syscall.SIGINT - }() - <-irq - log.Printf("shutting down") -} - -func interrupt() chan os.Signal { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return c -} diff --git a/experimental/graphviz/report_lifo.go b/experimental/graphviz/report_lifo.go deleted file mode 100644 index 4699c3187c..0000000000 --- a/experimental/graphviz/report_lifo.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "time" - - "github.com/weaveworks/scope/report" -) - -// Copy/paste from app/report_lifo.go - -// Reporter XXX -type Reporter interface { - Report() report.Report -} - -type timedReport struct { - report.Report - Timestamp time.Time -} - -// ReportLIFO XXX -type ReportLIFO struct { - reports []timedReport - requests chan chan report.Report - quit chan chan struct{} -} - -type reporter interface { - Reports() <-chan report.Report -} - -// NewReportLIFO XXX -func NewReportLIFO(r reporter, maxAge time.Duration) *ReportLIFO { - l := ReportLIFO{ - reports: []timedReport{}, - requests: make(chan chan report.Report), - quit: make(chan chan struct{}), - } - - go func() { - for { - select { - case report := <-r.Reports(): - tr := timedReport{ - Timestamp: time.Now(), - Report: report, - } - l.reports = append(l.reports, tr) - l.reports = cleanOld(l.reports, time.Now().Add(-maxAge)) - - case req := <-l.requests: - report := report.MakeReport() - for _, r := range l.reports { - report.Merge(r.Report) - } - req <- report - - case q := <-l.quit: - close(q) - return - } - } - }() - return &l -} - -// Stop XXX -func (r *ReportLIFO) Stop() { - q := make(chan struct{}) - r.quit <- q - <-q -} - -// Report XXX -func (r *ReportLIFO) Report() report.Report { - req := make(chan report.Report) - r.requests <- req - return <-req -} - -func cleanOld(reports []timedReport, threshold time.Time) []timedReport { - res := make([]timedReport, 0, len(reports)) - for _, tr := range reports { - if tr.Timestamp.Before(threshold) { - continue - } - res = append(res, tr) - } - return res -} diff --git a/experimental/oneshot/Makefile b/experimental/oneshot/Makefile deleted file mode 100644 index 70334ba496..0000000000 --- a/experimental/oneshot/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -.PHONY: all vet lint build test clean - -all: build test vet lint - -vet: - go vet ./... - -lint: - golint . - -build: - go build - -test: - go test - -clean: - go clean - diff --git a/experimental/oneshot/main.go b/experimental/oneshot/main.go deleted file mode 100644 index 5404761c38..0000000000 --- a/experimental/oneshot/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - _ "net/http/pprof" - "os" - "os/signal" - "strings" - "syscall" - "time" - - "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/xfer" -) - -func main() { - var ( - defaultProbes = fmt.Sprintf("localhost:%d", xfer.ProbePort) - probes = flag.String("probes", defaultProbes, "list of probe endpoints, comma separated") - ) - flag.Parse() - if len(flag.Args()) != 0 { - flag.Usage() - os.Exit(1) - } - - // Collector deals with the probes, and generates merged reports. - xfer.MaxBackoff = 1 * time.Second - c := xfer.NewCollector(1*time.Second, "id") - for _, addr := range strings.Split(*probes, ",") { - c.Add(addr) - } - defer c.Stop() - - report := report.MakeReport() - irq := interrupt() -OUTER: - for { - select { - case r := <-c.Reports(): - report.Merge(r) - case <-irq: - break OUTER - } - } - - b, err := json.Marshal(report) - if err != nil { - panic(err) - } - fmt.Print(string(b)) -} - -func interrupt() chan os.Signal { - c := make(chan os.Signal) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - return c -} diff --git a/probe/main.go b/probe/main.go index b1e5864d28..0383384f06 100644 --- a/probe/main.go +++ b/probe/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "log" "net" "net/http" @@ -9,7 +10,6 @@ import ( "os" "os/signal" "runtime" - "strconv" "strings" "syscall" "time" @@ -29,10 +29,11 @@ var version = "dev" // set at build time func main() { var ( + targets = []string{fmt.Sprintf("localhost:%d", xfer.AppPort), fmt.Sprintf("scope.weave.local:%d", xfer.AppPort)} + token = flag.String("token", "default-token", "probe token") httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server") publishInterval = flag.Duration("publish.interval", 3*time.Second, "publish (output) interval") spyInterval = flag.Duration("spy.interval", time.Second, "spy (scan) interval") - listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address") prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)") spyProcs = flag.Bool("processes", true, "report processes (needs root)") dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes") @@ -46,14 +47,13 @@ func main() { captureOff = flag.Duration("capture.off", 5*time.Second, "packet capture duty cycle 'off'") ) flag.Parse() - log.SetFlags(log.Lmicroseconds) - if len(flag.Args()) != 0 { - flag.Usage() - os.Exit(1) - } + log.Printf("probe starting, version %s", version) - log.Printf("probe version %s", version) + if len(flag.Args()) > 0 { + targets = flag.Args() + } + log.Printf("publishing to: %s", strings.Join(targets, ", ")) procspy.SetProcRoot(*procRoot) @@ -74,11 +74,10 @@ func main() { log.Printf("warning: process reporting enabled, but that requires root to find everything") } - publisher, err := xfer.NewTCPPublisher(*listen) - if err != nil { - log.Fatal(err) - } - defer publisher.Close() + publisherFactory := func(target string) (xfer.Publisher, error) { return xfer.NewHTTPPublisher(target, *token) } + publishers := xfer.NewMultiPublisher(publisherFactory) + resolver := newStaticResolver(targets, publishers.Add) + defer resolver.Stop() addrs, err := net.InterfaceAddrs() if err != nil { @@ -147,8 +146,6 @@ func main() { } } - log.Printf("listening on %s", *listen) - quit := make(chan struct{}) defer close(quit) go func() { @@ -163,7 +160,9 @@ func main() { case <-pubTick: publishTicks.WithLabelValues().Add(1) r.Window = *publishInterval - publisher.Publish(r) + if err := publishers.Publish(r); err != nil { + log.Printf("publish: %v", err) + } r = report.MakeReport() case <-spyTick: @@ -184,7 +183,6 @@ func main() { } } }() - log.Printf("%s", <-interrupt()) } diff --git a/app/resolver.go b/probe/resolver.go similarity index 97% rename from app/resolver.go rename to probe/resolver.go index dc75c3bf1d..8b8c7e97e2 100644 --- a/app/resolver.go +++ b/probe/resolver.go @@ -57,7 +57,7 @@ func prepareNames(strs []string) []peer { continue } } else { - hostname, port = s, strconv.Itoa(xfer.ProbePort) + hostname, port = s, strconv.Itoa(xfer.AppPort) } results = append(results, peer{hostname, port}) diff --git a/app/resolver_test.go b/probe/resolver_test.go similarity index 87% rename from app/resolver_test.go rename to probe/resolver_test.go index 6e47fe6b2f..0958dcde14 100644 --- a/app/resolver_test.go +++ b/probe/resolver_test.go @@ -43,26 +43,26 @@ func TestResolver(t *testing.T) { t.Errorf("line %d: want %q, have %q", line, want, have) } case <-time.After(time.Millisecond): - t.Fatalf("line %d: didn't get add in time", line) + t.Errorf("line %d: didn't get add in time", line) } } // Initial resolve should just give us IPs assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.ProbePort)) + assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) // Trigger another resolve with a tick; again, // just want ips. c <- time.Now() assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.ProbePort)) + assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip3 := "1.2.3.4" ips = map[string][]net.IP{"symbolic.name": makeIPs(ip3)} c <- time.Now() // trigger a resolve assertAdd(ip3 + port) // we want 1 add assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.ProbePort)) + assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) ip4 := "10.10.10.10" ips = map[string][]net.IP{"symbolic.name": makeIPs(ip3, ip4)} @@ -70,7 +70,7 @@ func TestResolver(t *testing.T) { assertAdd(ip3 + port) // first add assertAdd(ip4 + port) // second add assertAdd(ip1 + port) - assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.ProbePort)) + assertAdd(fmt.Sprintf("%s:%d", ip2, xfer.AppPort)) done := make(chan struct{}) go func() { r.Stop(); close(done) }() diff --git a/xfer/collector.go b/xfer/collector.go index 9333ee1068..a19d684ac0 100644 --- a/xfer/collector.go +++ b/xfer/collector.go @@ -1,233 +1,79 @@ package xfer import ( - "encoding/gob" - "log" - "net" "sync" "time" "github.com/weaveworks/scope/report" ) -const ( - connectTimeout = 10 * time.Second - initialBackoff = 2 * time.Second -) - -var ( - // MaxBackoff is the maximum time between connect retries. - // It's exported so it's externally configurable. - MaxBackoff = 1 * time.Minute - - // This is extracted out for mocking. - tick = time.Tick -) - -// Collector describes anything that can have addresses added and removed, and -// which produces reports that represent aggregate reports from all collected -// addresses. -type Collector interface { - Add(string) - Remove(string) - Reports() <-chan report.Report - Stop() +// Reporter is something that can produce reports on demand. It's a convenient +// interface for parts of the app, and several experimental components. +type Reporter interface { + Report() report.Report } -// realCollector connects to probes over TCP and merges reports published by those -// probes into a single one. -type realCollector struct { - in chan report.Report - out chan report.Report - peekc chan chan report.Report - add chan string - remove chan string - quit chan struct{} - id string +// Adder is something that can accept reports. It's a convenient interface for +// parts of the app, and several experimental components. +type Adder interface { + Add(report.Report) } -// NewCollector produces and returns a report collector. -func NewCollector(batchTime time.Duration, id string) Collector { - c := &realCollector{ - in: make(chan report.Report), - out: make(chan report.Report), - peekc: make(chan chan report.Report), - add: make(chan string), - remove: make(chan string), - quit: make(chan struct{}), - id: id, - } - go c.loop(batchTime) - return c +// Collector receives published reports from multiple producers. It yields a +// single merged report, representing all collected reports. +type Collector struct { + mtx sync.Mutex + reports []timestampReport + window time.Duration } -func (c *realCollector) loop(batchTime time.Duration) { - var ( - tick = tick(batchTime) - current = report.MakeReport() - addrs = map[string]chan struct{}{} - wg = &sync.WaitGroup{} // per-address goroutines - ) - - add := func(ip string) { - if _, ok := addrs[ip]; ok { - return - } - addrs[ip] = make(chan struct{}) - wg.Add(1) - go func(quit chan struct{}) { - defer wg.Done() - c.reportCollector(ip, quit) - }(addrs[ip]) - } - - remove := func(ip string) { - q, ok := addrs[ip] - if !ok { - return // hmm - } - close(q) - delete(addrs, ip) - } - - for { - select { - case <-tick: - c.out <- current - current = report.MakeReport() - - case pc := <-c.peekc: - copy := report.MakeReport() - copy.Merge(current) - pc <- copy - - case r := <-c.in: - if err := r.Validate(); err != nil { - log.Printf("Received invalid report: %v", err) - continue - } - current.Merge(r) - - case ip := <-c.add: - add(ip) - - case ip := <-c.remove: - remove(ip) - - case <-c.quit: - for _, q := range addrs { - close(q) - } - wg.Wait() - return - } +// NewCollector returns a collector ready for use. +func NewCollector(window time.Duration) *Collector { + return &Collector{ + window: window, } } -// Add adds an address to be collected from. -func (c *realCollector) Add(addr string) { - c.add <- addr -} +var now = time.Now -// Remove removes a previously-added address. -func (c *realCollector) Remove(addr string) { - c.remove <- addr +// Add adds a report to the collector's internal state. It implements Adder. +func (c *Collector) Add(rpt report.Report) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.reports = append(c.reports, timestampReport{now(), rpt}) + c.reports = clean(c.reports, c.window) } -// Reports returns the report chan. It must be consumed by the client, or the -// collector will break. -func (c *realCollector) Reports() <-chan report.Report { - return c.out -} +// Report returns a merged report over all added reports. It implements +// Reporter. +func (c *Collector) Report() report.Report { + c.mtx.Lock() + defer c.mtx.Unlock() -func (c *realCollector) peek() report.Report { - pc := make(chan report.Report) - c.peekc <- pc - return <-pc -} + c.reports = clean(c.reports, c.window) -// Stop terminates the collector. -func (c *realCollector) Stop() { - close(c.quit) + report := report.MakeReport() + for _, tr := range c.reports { + report.Merge(tr.report) + } + return report } -// reportCollector is the loop to connect to a single Probe. It'll keep -// running until the quit channel is closed. -func (c *realCollector) reportCollector(ip string, quit <-chan struct{}) { - backoff := initialBackoff / 2 - for { - backoff *= 2 - if backoff > MaxBackoff { - backoff = MaxBackoff - } - - select { - default: - case <-quit: - return - } - - log.Printf("dialing %v (backoff %v)", ip, backoff) - - conn, err := net.DialTimeout("tcp", ip, connectTimeout) - if err != nil { - log.Print(err) - select { - case <-time.After(backoff): - continue - case <-quit: - return - } - } - - log.Printf("connected to %v", ip) - - go func() { - <-quit - log.Printf("closing %v collector", ip) - conn.Close() - }() - - // Connection accepted. - if err := gob.NewEncoder(conn).Encode(HandshakeRequest{ID: c.id}); err != nil { - log.Printf("handshake error: %v", err) - break - } - - dec := gob.NewDecoder(conn) - for { - var report report.Report - err := dec.Decode(&report) - // Don't complain of errors when shutting down. - select { - default: - case <-quit: - return - } - if err != nil { - log.Printf("decode error: %v", err) - break - } - - select { - case c.in <- report: - case <-quit: - return - } - - // Reset the backoff iff we have a connection which works. This - // prevents us from spamming probes with multiple addresses (since - // the probe closes everything but a single connection). - backoff = initialBackoff - } +type timestampReport struct { + timestamp time.Time + report report.Report +} - // Prevent a 100% CPU loop when the probe is closing the - // connection right away (which happens on a probe which already - // has a client) - select { - case <-time.After(backoff): - case <-quit: - return +func clean(reports []timestampReport, window time.Duration) []timestampReport { + var ( + cleaned = make([]timestampReport, 0, len(reports)) + oldest = now().Add(-window) + ) + for _, tr := range reports { + if tr.timestamp.Before(oldest) { + continue } + cleaned = append(cleaned, tr) } + return cleaned } diff --git a/xfer/collector_internal_test.go b/xfer/collector_internal_test.go deleted file mode 100644 index 614dad1a69..0000000000 --- a/xfer/collector_internal_test.go +++ /dev/null @@ -1,103 +0,0 @@ -package xfer - -import ( - "encoding/gob" - "io/ioutil" - "log" - "net" - "reflect" - "runtime" - "testing" - "time" - - "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/test" -) - -func TestCollector(t *testing.T) { - log.SetOutput(ioutil.Discard) - - // Swap out ticker - publish := make(chan time.Time) - oldTick := tick - tick = func(time.Duration) <-chan time.Time { return publish } - defer func() { tick = oldTick }() - - // Build a collector - collector := NewCollector(time.Second, "id") - defer collector.Stop() - - concreteCollector, ok := collector.(*realCollector) - if !ok { - t.Fatal("type assertion failure") - } - - // Build a test publisher - reports := make(chan interface{}) - ln := testPublisher(t, reports) - defer ln.Close() - - // Connect the collector to the test publisher - addr := ln.Addr().String() - collector.Add(addr) - collector.Add(addr) // test duplicate case - runtime.Gosched() // make sure it connects - - // Push a report through everything - r := report.Report{ - Address: report.Topology{ - NodeMetadatas: report.NodeMetadatas{ - report.MakeAddressNodeID("a", "b"): report.MakeNodeMetadata(), - }, - }, - } - - reports <- r - test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return len(concreteCollector.peek().Address.NodeMetadatas) - }) - - go func() { publish <- time.Now() }() - collected := <-collector.Reports() - if reflect.DeepEqual(r, collected) { - t.Errorf(test.Diff(r, collected)) - } - - collector.Remove(addr) - collector.Remove(addr) // test duplicate case -} - -func TestCollectorQuitWithActiveConnections(t *testing.T) { - c := NewCollector(time.Second, "id") - c.Add("1.2.3.4:56789") - c.Stop() -} - -func testPublisher(t *testing.T, input <-chan interface{}) net.Listener { - addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - ln, err := net.ListenTCP("tcp4", addr) - if err != nil { - t.Fatal(err) - } - go func() { - conn, err := ln.Accept() - if err != nil { - t.Log(err) - return - } - defer conn.Close() - for { - enc := gob.NewEncoder(conn) - for v := range input { - if err := enc.Encode(v); err != nil { - t.Error(err) - return - } - } - } - }() - return ln -} diff --git a/xfer/collector_test.go b/xfer/collector_test.go new file mode 100644 index 0000000000..f8fc98508a --- /dev/null +++ b/xfer/collector_test.go @@ -0,0 +1,41 @@ +package xfer_test + +import ( + "reflect" + "time" + + "github.com/weaveworks/scope/test" + + "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/xfer" + + "testing" +) + +func TestCollector(t *testing.T) { + window := time.Millisecond + c := xfer.NewCollector(window) + + r1 := report.MakeReport() + r1.Endpoint.NodeMetadatas["foo"] = report.MakeNodeMetadata() + + r2 := report.MakeReport() + r2.Endpoint.NodeMetadatas["bar"] = report.MakeNodeMetadata() + + if want, have := report.MakeReport(), c.Report(); !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) + } + + c.Add(r1) + if want, have := r1, c.Report(); !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) + } + + c.Add(r2) + merged := report.MakeReport() + merged.Merge(r1) + merged.Merge(r2) + if want, have := merged, c.Report(); !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) + } +} diff --git a/xfer/merge_test.go b/xfer/merge_test.go deleted file mode 100644 index 00aff01589..0000000000 --- a/xfer/merge_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package xfer_test - -import ( - "io/ioutil" - "log" - "testing" - "time" - - "github.com/weaveworks/scope/report" - "github.com/weaveworks/scope/xfer" -) - -func TestMerge(t *testing.T) { - log.SetOutput(ioutil.Discard) - - var ( - p1Addr = "localhost:7888" - p2Addr = "localhost:7889" - ) - - p1, err := xfer.NewTCPPublisher(p1Addr) - if err != nil { - t.Fatal(err) - } - defer p1.Close() - - p2, err := xfer.NewTCPPublisher(p2Addr) - if err != nil { - t.Fatal(err) - } - defer p2.Close() - - batchTime := 100 * time.Millisecond - c := xfer.NewCollector(batchTime, "id") - c.Add(p1Addr) - c.Add(p2Addr) - defer c.Stop() - time.Sleep(batchTime / 10) // connect - - k1, k2 := report.MakeHostNodeID("p1"), report.MakeHostNodeID("p2") - - { - r := report.MakeReport() - r.Host.NodeMetadatas[k1] = report.MakeNodeMetadataWith(map[string]string{"host_name": "test1"}) - p1.Publish(r) - } - { - r := report.MakeReport() - r.Host.NodeMetadatas[k2] = report.MakeNodeMetadataWith(map[string]string{"host_name": "test2"}) - p2.Publish(r) - } - - success := make(chan struct{}) - go func() { - defer close(success) - for r := range c.Reports() { - if r.Host.NodeMetadatas[k1].Metadata["host_name"] != "test1" { - continue - } - if r.Host.NodeMetadatas[k2].Metadata["host_name"] != "test2" { - continue - } - return - } - }() - - select { - case <-success: - case <-time.After(2 * batchTime): - t.Errorf("collector didn't capture both reports") - } -} diff --git a/xfer/ports.go b/xfer/ports.go index bd2fbe5d22..11e1e074aa 100644 --- a/xfer/ports.go +++ b/xfer/ports.go @@ -1,10 +1,8 @@ package xfer var ( - // ProbePort is the default port that the probe(s) will listen on to - // publish reports. - ProbePort = 4030 - // AppPort is the default port that the app will use for its HTTP server. + // The app publishes the API and user interface, and receives reports from + // probes, on this port. AppPort = 4040 ) diff --git a/xfer/publisher.go b/xfer/publisher.go index d8c3294c26..6b457b314e 100644 --- a/xfer/publisher.go +++ b/xfer/publisher.go @@ -1,132 +1,125 @@ package xfer import ( + "bytes" "encoding/gob" - "io" + "fmt" "log" - "net" + "net/http" + "net/url" + "strings" + "sync" "github.com/weaveworks/scope/report" ) -// Publisher provides a way to send reports upstream. +// Publisher is something which can send a report to a remote collector. type Publisher interface { - Publish(report.Report) - Close() + Publish(report.Report) error } -// TCPPublisher is a Publisher implementation which uses TCP and gob encoding. -type TCPPublisher struct { - msg chan report.Report - closer io.Closer +// HTTPPublisher publishes reports by POST to a fixed endpoint. +type HTTPPublisher struct { + url string + token string } -// HandshakeRequest contains the unique ID of the connecting app. -type HandshakeRequest struct { - ID string -} - -// NewTCPPublisher listens for connections on listenAddress. Only one client -// is accepted at a time; other clients are accepted, but disconnected right -// away. Reports published via publish() will be written to the connected -// client, if any. Gentle shutdown of the returned publisher via close(). -func NewTCPPublisher(listenAddress string) (*TCPPublisher, error) { - listener, err := net.Listen("tcp", listenAddress) +// NewHTTPPublisher returns an HTTPPublisher ready for use. +func NewHTTPPublisher(target, token string) (*HTTPPublisher, error) { + if !strings.HasPrefix(target, "http") { + target = "http://" + target + } + u, err := url.Parse(target) if err != nil { return nil, err } - - p := &TCPPublisher{ - msg: make(chan report.Report), - closer: listener, + if u.Path == "" { + u.Path = "/api/report" } + return &HTTPPublisher{ + url: u.String(), + token: token, + }, nil +} - go p.loop(fwd(listener)) - - return p, nil +// Publish publishes the report to the URL. +func (p HTTPPublisher) Publish(rpt report.Report) error { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(rpt); err != nil { + return err + } + req, err := http.NewRequest("POST", p.url, &buf) + if err != nil { + return err + } + req.Header.Set("Authorization", AuthorizationHeader(p.token)) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf(resp.Status) + } + return nil } -// Close stops a TCPPublisher and closes the socket. -func (p *TCPPublisher) Close() { - close(p.msg) - p.closer.Close() +// AuthorizationHeader returns a value suitable for an HTTP Authorization +// header, based on the passed token string. +func AuthorizationHeader(token string) string { + return fmt.Sprintf("Scope-Probe token=%s", token) } -// Publish sens a Report to the client, if any. -func (p *TCPPublisher) Publish(msg report.Report) { - p.msg <- msg +// MultiPublisher implements Publisher over a set of publishers. +type MultiPublisher struct { + mtx sync.RWMutex + factory func(string) (Publisher, error) + m map[string]Publisher } -func (p *TCPPublisher) loop(incoming <-chan net.Conn) { - type connEncoder struct { - net.Conn - *gob.Encoder +// NewMultiPublisher returns a new MultiPublisher ready for use. The factory +// should be e.g. NewHTTPPublisher, except you need to curry it over the +// probe token. +func NewMultiPublisher(factory func(string) (Publisher, error)) *MultiPublisher { + return &MultiPublisher{ + factory: factory, + m: map[string]Publisher{}, } +} - activeConns := map[string]connEncoder{} // host: connEncoder - - for { - select { - case conn, ok := <-incoming: - if !ok { - return // someone closed our connection chan -- weird? - } - - // Don't allow multiple connections from the same remote host. - listenerID, err := getListenerID(conn) - if err != nil { - log.Printf("incoming connection: %s: %v (dropped)", conn.RemoteAddr(), err) - conn.Close() - continue - } - if _, ok := activeConns[listenerID]; ok { - log.Printf("duplicate connection: %s (dropped)", conn.RemoteAddr()) - conn.Close() - continue - } - - log.Printf("connection initiated: %s (%s)", conn.RemoteAddr(), listenerID) - activeConns[listenerID] = connEncoder{conn, gob.NewEncoder(conn)} - - case msg, ok := <-p.msg: - if !ok { - return // someone closed our msg chan, so we're done - } - - for host, connEncoder := range activeConns { - if err := connEncoder.Encoder.Encode(msg); err != nil { - log.Printf("connection terminated: %s: %v", connEncoder.Conn.RemoteAddr(), err) - connEncoder.Conn.Close() - delete(activeConns, host) - } - } - } +// Add allows additional targets to be added dynamically. It will dedupe +// identical targets. TODO we have no good mechanism to remove. +func (p *MultiPublisher) Add(target string) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if _, ok := p.m[target]; ok { + return } -} -func getListenerID(c net.Conn) (string, error) { - var req HandshakeRequest - if err := gob.NewDecoder(c).Decode(&req); err != nil { - return "", err + publisher, err := p.factory(target) + if err != nil { + log.Printf("multi-publisher: %v", err) + return } - return req.ID, nil + p.m[target] = publisher } -func fwd(ln net.Listener) chan net.Conn { - c := make(chan net.Conn) - - go func() { - defer close(c) - for { - conn, err := ln.Accept() - if err != nil { - log.Printf("%s: %s", ln.Addr(), err) - return - } - c <- conn +// Publish implements Publisher by emitting the report to all publishers. +func (p *MultiPublisher) Publish(rpt report.Report) error { + p.mtx.RLock() + defer p.mtx.RUnlock() + + var errs []string + for _, publisher := range p.m { + if err := publisher.Publish(rpt); err != nil { + errs = append(errs, err.Error()) } - }() + } - return c + if len(errs) > 0 { + return fmt.Errorf(strings.Join(errs, "; ")) + } + return nil } diff --git a/xfer/publisher_test.go b/xfer/publisher_test.go index 12fed6ac0d..2618bf34a0 100644 --- a/xfer/publisher_test.go +++ b/xfer/publisher_test.go @@ -2,120 +2,72 @@ package xfer_test import ( "encoding/gob" - "fmt" - "io/ioutil" - "log" - "net" + "net/http" + "net/http/httptest" + "reflect" "testing" - "time" "github.com/weaveworks/scope/report" + "github.com/weaveworks/scope/test" "github.com/weaveworks/scope/xfer" ) -func TestTCPPublisher(t *testing.T) { - log.SetOutput(ioutil.Discard) - - // Choose a port - port, err := getFreePort() - if err != nil { - t.Fatal(err) - } - - // Start a publisher - p, err := xfer.NewTCPPublisher(port) +func TestHTTPPublisher(t *testing.T) { + var ( + token = "abcdefg" + rpt = report.MakeReport() + ) + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if want, have := xfer.AuthorizationHeader(token), r.Header.Get("Authorization"); want != have { + t.Errorf("want %q, have %q", want, have) + } + var have report.Report + if err := gob.NewDecoder(r.Body).Decode(&have); err != nil { + t.Error(err) + return + } + if want := rpt; !reflect.DeepEqual(want, have) { + t.Error(test.Diff(want, have)) + return + } + w.WriteHeader(http.StatusOK) + })) + defer s.Close() + + p, err := xfer.NewHTTPPublisher(s.URL, token) if err != nil { t.Fatal(err) } - defer p.Close() - - // Start a raw listener - conn, err := net.Dial("tcp4", "127.0.0.1"+port) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - time.Sleep(time.Millisecond) - - // Send handshake - if err := gob.NewEncoder(conn).Encode(xfer.HandshakeRequest{ID: "foo"}); err != nil { - t.Fatal(err) - } - - // Publish a message - p.Publish(report.Report{}) - - // Check it was received - var r report.Report - if err := gob.NewDecoder(conn).Decode(&r); err != nil { - t.Fatal(err) + if err := p.Publish(rpt); err != nil { + t.Error(err) } } -func TestPublisherClosesDuplicateConnections(t *testing.T) { - log.SetOutput(ioutil.Discard) +func TestMultiPublisher(t *testing.T) { + var ( + p = &mockPublisher{} + factory = func(string) (xfer.Publisher, error) { return p, nil } + multiPublisher = xfer.NewMultiPublisher(factory) + ) - // Choose a port - port, err := getFreePort() - if err != nil { - t.Fatal(err) + multiPublisher.Add("first") + if err := multiPublisher.Publish(report.MakeReport()); err != nil { + t.Error(err) } - - // Start a publisher - p, err := xfer.NewTCPPublisher(port) - if err != nil { - t.Fatal(err) + if want, have := 1, p.count; want != have { + t.Errorf("want %d, have %d", want, have) } - defer p.Close() - // Connect a listener - conn, err := net.Dial("tcp4", "127.0.0.1"+port) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - if err := gob.NewEncoder(conn).Encode(xfer.HandshakeRequest{ID: "foo"}); err != nil { - t.Fatal(err) - } - time.Sleep(time.Millisecond) - - // Try to connect the same listener - dupconn, err := net.Dial("tcp4", "127.0.0.1"+port) - if err != nil { - t.Fatal(err) - } - // Send handshake - if err := gob.NewEncoder(dupconn).Encode(xfer.HandshakeRequest{ID: "foo"}); err != nil { - t.Fatal(err) - } - defer dupconn.Close() - - // Publish a message - p.Publish(report.Report{}) - - // The first listener should receive it - var r report.Report - if err := gob.NewDecoder(conn).Decode(&r); err != nil { - t.Fatal(err) + multiPublisher.Add("second") // but factory returns same mockPublisher + if err := multiPublisher.Publish(report.MakeReport()); err != nil { + t.Error(err) } - - // The duplicate listener should have an error - if err := gob.NewDecoder(dupconn).Decode(&r); err == nil { - t.Errorf("expected error, got none") - } else { - t.Logf("dupconn got expected error: %v", err) + if want, have := 3, p.count; want != have { + t.Errorf("want %d, have %d", want, have) } } -func getFreePort() (string, error) { - ln, err := net.Listen("tcp4", ":0") - if err != nil { - return "", fmt.Errorf("Listen: %v", err) - } - defer ln.Close() - _, port, err := net.SplitHostPort(ln.Addr().String()) - if err != nil { - return "", fmt.Errorf("SplitHostPort(%s): %v", ln.Addr().String(), err) - } - return ":" + port, nil -} +type mockPublisher struct{ count int } + +func (p *mockPublisher) Publish(report.Report) error { p.count++; return nil }