From db3ff8bb7c75dfd7dcc69eefe04e9686dce04f45 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 16 Nov 2016 16:45:26 -0500 Subject: [PATCH 1/3] Add monitor http endpoint --- api/agent.go | 35 ++++++++ api/agent_test.go | 24 ++++++ command/agent/agent.go | 4 + command/agent/agent_endpoint.go | 82 +++++++++++++++++++ command/agent/agent_endpoint_test.go | 69 ++++++++++++++++ command/agent/command.go | 1 + command/agent/http.go | 1 + command/agent/http_test.go | 6 +- .../docs/agent/http/agent.html.markdown | 14 +++- 9 files changed, 234 insertions(+), 2 deletions(-) diff --git a/api/agent.go b/api/agent.go index 41d8bc0b1f66..934c8dbd0c64 100644 --- a/api/agent.go +++ b/api/agent.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "fmt" ) @@ -410,3 +411,37 @@ func (a *Agent) DisableNodeMaintenance() error { resp.Body.Close() return nil } + +// Monitor returns a channel which will receive streaming logs from the agent +// Providing a non-nil stopCh can be used to close the connection and stop the +// log stream +func (a *Agent) Monitor(loglevel string, stopCh chan struct{}) (chan string, error) { + r := a.c.newRequest("GET", "/v1/agent/monitor") + if loglevel != "" { + r.params.Add("loglevel", loglevel) + } + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + + logCh := make(chan string, 64) + go func() { + defer resp.Body.Close() + + scanner := bufio.NewScanner(resp.Body) + for { + select { + case <-stopCh: + close(logCh) + return + default: + } + if scanner.Scan() { + logCh <- scanner.Text() + } + } + }() + + return logCh, nil +} diff --git a/api/agent_test.go b/api/agent_test.go index 215d240dc906..970380565fc7 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -3,6 +3,7 @@ package api import ( "strings" "testing" + "time" ) func TestAgent_Self(t *testing.T) { @@ -558,6 +559,29 @@ func TestAgent_ForceLeave(t *testing.T) { } } +func TestAgent_Monitor(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + agent := c.Agent() + + logCh, err := agent.Monitor("info", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the first log message and validate it + select { + case log := <-logCh: + if !strings.Contains(log, "[INFO] raft: Initial configuration") { + t.Fatalf("bad: %q", log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get a log message") + } +} + func TestServiceMaintenance(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/agent/agent.go b/command/agent/agent.go index 54e2b0c9ee96..6bee6fba071a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/coordinate" @@ -66,6 +67,9 @@ type Agent struct { // Output sink for logs logOutput io.Writer + // Used for streaming logs to + logWriter *logger.LogWriter + // We have one of a client or a server, depending // on our configuration server *consul.Server diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index db33256af228..fd4586b20b09 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -2,12 +2,15 @@ package agent import ( "fmt" + "log" "net/http" "strconv" "strings" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" + "github.com/hashicorp/logutils" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -393,6 +396,61 @@ func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Re return nil, nil } +func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Only GET supported + if req.Method != "GET" { + resp.WriteHeader(405) + return nil, nil + } + + // Get the provided loglevel + logLevel := req.URL.Query().Get("loglevel") + if logLevel == "" { + logLevel = "INFO" + } + + // Upper case the log level + logLevel = strings.ToUpper(logLevel) + + // Create a level filter + filter := logger.LevelFilter() + filter.MinLevel = logutils.LogLevel(logLevel) + if !logger.ValidateLevelFilter(filter.MinLevel, filter) { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Unknown log level: %s", filter.MinLevel))) + return nil, nil + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + // Set up a log handler + handler := &httpLogHandler{ + filter: filter, + logCh: make(chan string, 512), + logger: s.logger, + } + s.agent.logWriter.RegisterHandler(handler) + defer s.agent.logWriter.DeregisterHandler(handler) + + notify := resp.(http.CloseNotifier).CloseNotify() + + // Stream logs until the connection is closed + for { + select { + case <-notify: + return nil, nil + case log := <-handler.logCh: + resp.Write([]byte(log + "\n")) + flusher.Flush() + } + } + + return nil, nil +} + // syncChanges is a helper function which wraps a blocking call to sync // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. @@ -401,3 +459,27 @@ func (s *HTTPServer) syncChanges() { s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } + +type httpLogHandler struct { + filter *logutils.LevelFilter + logCh chan string + logger *log.Logger +} + +func (h *httpLogHandler) HandleLog(log string) { + // Check the log level + if !h.filter.Check([]byte(log)) { + return + } + + // Do a non-blocking send + select { + case h.logCh <- log: + default: + // We can't log synchronously, since we are already being invoked + // from the logWriter, and a log will need to invoke Write() which + // already holds the lock. We must therefor do the log async, so + // as to not deadlock + go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint") + } +} diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index a29b8ae9d2c5..6bc875a3040f 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -1,8 +1,10 @@ package agent import ( + "bytes" "errors" "fmt" + "io" "net/http" "net/http/httptest" "os" @@ -12,6 +14,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" @@ -1019,3 +1022,69 @@ func TestHTTPAgentRegisterServiceCheck(t *testing.T) { t.Fatalf("bad: %#v", result["memcached_check2"]) } } + +func TestHTTPAgent_Monitor(t *testing.T) { + logWriter := logger.NewLogWriter(512) + expectedLogs := bytes.Buffer{} + logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) + + dir, srv := makeHTTPServerWithConfigLog(t, nil, logger) + srv.agent.logWriter = logWriter + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Begin streaming logs from the monitor endpoint + req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + resp := newClosableRecorder() + go func() { + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %s", err) + } + }() + + // Write the incoming logs to a channel for reading + logCh := make(chan string, 0) + go func() { + for { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + t.Fatalf("err: %v", err) + } + if line != "" { + logCh <- line + } + } + }() + + // Verify that the first 5 logs we get match the expected stream + for i := 0; i < 5; i++ { + select { + case log := <-logCh: + expected, err := expectedLogs.ReadString('\n') + if err != nil { + t.Fatalf("err: %v", err) + } + if log != expected { + t.Fatalf("bad: %q %q", expected, log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get log within timeout") + } + } +} + +type closableRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func newClosableRecorder() *closableRecorder { + r := httptest.NewRecorder() + closer := make(chan bool) + return &closableRecorder{r, closer} +} + +func (r *closableRecorder) CloseNotify() <-chan bool { + return r.closer +} diff --git a/command/agent/command.go b/command/agent/command.go index 247ffd1b9038..c4df74660d5a 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -471,6 +471,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err } + agent.logWriter = logWriter c.agent = agent // Setup the RPC listener diff --git a/command/agent/http.go b/command/agent/http.go index 082fc039b54e..cf1e1ab82dc0 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -251,6 +251,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { } s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7cc80cb240c7..5953f4c32059 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -28,6 +28,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { + return makeHTTPServerWithConfigLog(t, cb, nil) +} + +func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) { configTry := 0 RECONF: configTry += 1 @@ -36,7 +40,7 @@ RECONF: cb(conf) } - dir, agent := makeAgent(t, conf) + dir, agent := makeAgentLog(t, conf, l) servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { if configTry < 3 { diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index 74f3752ca99a..6a9a9ce7682b 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -21,6 +21,7 @@ The following endpoints are supported: * [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent * [`/v1/agent/self`](#agent_self) : Returns the local node configuration * [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode +* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent * [`/v1/agent/join/
`](#agent_join) : Triggers the local agent to join a node * [`/v1/agent/force-leave/`](#agent_force_leave): Forces removal of a node * [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check @@ -211,6 +212,17 @@ to aid human operators. If no reason is provided, a default value will be used i The return code is 200 on success. +### /v1/agent/monitor + +Added in Consul 0.7.2, This endpoint is hit with a GET and will stream logs from the +local agent until the connection is closed. + +The `?loglevel` flag is optional. If provided, its value should be a text string +containing a log level to filter on, such as `info`. If no loglevel is provided, +`info` will be used as a default. + +The return code is 200 on success. + ### /v1/agent/join/\ This endpoint is hit with a GET and is used to instruct the agent to attempt to @@ -403,7 +415,7 @@ body must look like: ], "Address": "127.0.0.1", "Port": 8000, - "EnableTagOverride": false, + "EnableTagOverride": false, "Check": { "DeregisterCriticalServiceAfter": "90m", "Script": "/usr/local/bin/check_redis.py", From c1786a4e8baed4b848ddb9c87117290741159197 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 28 Nov 2016 16:08:31 -0500 Subject: [PATCH 2/3] Add logWriter to agent Create() method --- command/agent/agent.go | 3 +- command/agent/agent_endpoint.go | 28 +++++++++++++------ command/agent/agent_endpoint_test.go | 41 ++++++++++++++++++++++++---- command/agent/agent_test.go | 17 ++++++------ command/agent/command.go | 3 +- command/agent/http_test.go | 7 +++-- command/agent/rpc_client_test.go | 2 +- command/util_test.go | 2 +- 8 files changed, 74 insertions(+), 29 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 6bee6fba071a..f3c5461b84ca 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -125,7 +125,7 @@ type Agent struct { // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer) (*Agent, error) { +func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr @@ -179,6 +179,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { config: config, logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, + logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), checkMonitors: make(map[types.CheckID]*CheckMonitor), checkTTLs: make(map[types.CheckID]*CheckTTL), diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index fd4586b20b09..cb4afbde927f 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -403,6 +403,15 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, nil } + var args structs.DCSpecificRequest + args.Datacenter = s.agent.config.Datacenter + s.parseToken(req, &args.Token) + // Validate that the given token has operator permissions + var reply structs.RaftConfigurationResponse + if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil { + return nil, err + } + // Get the provided loglevel logLevel := req.URL.Query().Get("loglevel") if logLevel == "" { @@ -441,6 +450,10 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( for { select { case <-notify: + s.agent.logWriter.DeregisterHandler(handler) + if handler.droppedCount > 0 { + s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount) + } return nil, nil case log := <-handler.logCh: resp.Write([]byte(log + "\n")) @@ -461,9 +474,10 @@ func (s *HTTPServer) syncChanges() { } type httpLogHandler struct { - filter *logutils.LevelFilter - logCh chan string - logger *log.Logger + filter *logutils.LevelFilter + logCh chan string + logger *log.Logger + droppedCount int } func (h *httpLogHandler) HandleLog(log string) { @@ -476,10 +490,8 @@ func (h *httpLogHandler) HandleLog(log string) { select { case h.logCh <- log: default: - // We can't log synchronously, since we are already being invoked - // from the logWriter, and a log will need to invoke Write() which - // already holds the lock. We must therefor do the log async, so - // as to not deadlock - go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint") + // Just increment a counter for dropped logs to this handler; we can't log now + // because the lock is already held by the LogWriter invoking this + h.droppedCount += 1 } } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 6bc875a3040f..1919855e562d 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/http/httptest" "os" @@ -1028,23 +1029,53 @@ func TestHTTPAgent_Monitor(t *testing.T) { expectedLogs := bytes.Buffer{} logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) - dir, srv := makeHTTPServerWithConfigLog(t, nil, logger) + dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) srv.agent.logWriter = logWriter defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() - // Begin streaming logs from the monitor endpoint - req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + // Try passing an invalid log level + req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=invalid", nil) resp := newClosableRecorder() + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 400 { + t.Fatalf("bad: %v", resp.Code) + } + body, _ := ioutil.ReadAll(resp.Body) + if !strings.Contains(string(body), "Unknown log level") { + t.Fatalf("bad: %s", body) + } + + // Begin streaming logs from the monitor endpoint + req, _ = http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + resp = newClosableRecorder() go func() { if _, err := srv.AgentMonitor(resp, req); err != nil { t.Fatalf("err: %s", err) } }() - // Write the incoming logs to a channel for reading - logCh := make(chan string, 0) + // Write the incoming logs from http to a channel for comparison + logCh := make(chan string, 5) + + // Block until the first log entry from http + testutil.WaitForResult(func() (bool, error) { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + return false, fmt.Errorf("err: %v", err) + } + if line == "" { + return false, fmt.Errorf("blank line") + } + logCh <- line + return true, nil + }, func(err error) { + t.Fatal(err) + }) + go func() { for { line, err := resp.Body.ReadString('\n') diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 92740f352099..42aed2f8f52e 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/raft" ) @@ -79,14 +80,14 @@ func nextConfig() *Config { return conf } -func makeAgentLog(t *testing.T, conf *Config, l io.Writer) (string, *Agent) { +func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) { dir, err := ioutil.TempDir("", "agent") if err != nil { t.Fatalf(fmt.Sprintf("err: %v", err)) } conf.DataDir = dir - agent, err := Create(conf, l) + agent, err := Create(conf, l, writer) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) @@ -112,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { t.Fatalf("err: %s", err) } - agent, err := Create(conf, nil) + agent, err := Create(conf, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -121,7 +122,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { } func makeAgent(t *testing.T, conf *Config) (string, *Agent) { - return makeAgentLog(t, conf, nil) + return makeAgentLog(t, conf, nil, nil) } func externalIP() (string, error) { @@ -845,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -979,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } config.Services = []*ServiceDefinition{svc2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1072,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1165,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } config.Checks = []*CheckDefinition{check2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/command.go b/command/agent/command.go index c4df74660d5a..d47537363a67 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -466,12 +466,11 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) { // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { c.Ui.Output("Starting Consul agent...") - agent, err := Create(config, logOutput) + agent, err := Create(config, logOutput, logWriter) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err } - agent.logWriter = logWriter c.agent = agent // Setup the RPC listener diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 5953f4c32059..b02b65970527 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/go-cleanhttp" ) @@ -28,10 +29,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { - return makeHTTPServerWithConfigLog(t, cb, nil) + return makeHTTPServerWithConfigLog(t, cb, nil, nil) } -func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) { +func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { configTry := 0 RECONF: configTry += 1 @@ -40,7 +41,7 @@ RECONF: cb(conf) } - dir, agent := makeAgentLog(t, conf, l) + dir, agent := makeAgentLog(t, conf, l, logWriter) servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { if configTry < 3 { diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go index a8f473f767cd..a6feb12f517c 100644 --- a/command/agent/rpc_client_test.go +++ b/command/agent/rpc_client_test.go @@ -61,7 +61,7 @@ RECONF: t.Fatalf("err: %s", err) } - dir, agent := makeAgentLog(t, conf, mult) + dir, agent := makeAgentLog(t, conf, mult, lw) rpc := NewAgentRPC(agent, l, mult, lw) rpcClient, err := NewRPCClient(l.Addr().String()) diff --git a/command/util_test.go b/command/util_test.go index 8260a42173d3..1a4a1e892759 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { } conf.DataDir = dir - a, err := agent.Create(conf, lw) + a, err := agent.Create(conf, lw, nil) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) From e036c7ace9b70da634a8ec57fe8dfdbcb7e8f398 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 28 Nov 2016 16:13:49 -0500 Subject: [PATCH 3/3] Add QueryOptions to api package's monitor --- api/agent.go | 3 ++- api/agent_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/api/agent.go b/api/agent.go index 934c8dbd0c64..b652a141e507 100644 --- a/api/agent.go +++ b/api/agent.go @@ -415,8 +415,9 @@ func (a *Agent) DisableNodeMaintenance() error { // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop the // log stream -func (a *Agent) Monitor(loglevel string, stopCh chan struct{}) (chan string, error) { +func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) (chan string, error) { r := a.c.newRequest("GET", "/v1/agent/monitor") + r.setQueryOptions(q) if loglevel != "" { r.params.Add("loglevel", loglevel) } diff --git a/api/agent_test.go b/api/agent_test.go index 970380565fc7..cc16f990fc33 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -566,7 +566,7 @@ func TestAgent_Monitor(t *testing.T) { agent := c.Agent() - logCh, err := agent.Monitor("info", nil) + logCh, err := agent.Monitor("info", nil, nil) if err != nil { t.Fatalf("err: %v", err) }