diff --git a/api/agent.go b/api/agent.go index 41d8bc0b1f66..b652a141e507 100644 --- a/api/agent.go +++ b/api/agent.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "fmt" ) @@ -410,3 +411,38 @@ func (a *Agent) DisableNodeMaintenance() error { resp.Body.Close() return nil } + +// Monitor returns a channel which will receive streaming logs from the agent +// Providing a non-nil stopCh can be used to close the connection and stop the +// log stream +func (a *Agent) Monitor(loglevel string, stopCh chan struct{}, q *QueryOptions) (chan string, error) { + r := a.c.newRequest("GET", "/v1/agent/monitor") + r.setQueryOptions(q) + if loglevel != "" { + r.params.Add("loglevel", loglevel) + } + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + + logCh := make(chan string, 64) + go func() { + defer resp.Body.Close() + + scanner := bufio.NewScanner(resp.Body) + for { + select { + case <-stopCh: + close(logCh) + return + default: + } + if scanner.Scan() { + logCh <- scanner.Text() + } + } + }() + + return logCh, nil +} diff --git a/api/agent_test.go b/api/agent_test.go index 215d240dc906..cc16f990fc33 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -3,6 +3,7 @@ package api import ( "strings" "testing" + "time" ) func TestAgent_Self(t *testing.T) { @@ -558,6 +559,29 @@ func TestAgent_ForceLeave(t *testing.T) { } } +func TestAgent_Monitor(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + agent := c.Agent() + + logCh, err := agent.Monitor("info", nil, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the first log message and validate it + select { + case log := <-logCh: + if !strings.Contains(log, "[INFO] raft: Initial configuration") { + t.Fatalf("bad: %q", log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get a log message") + } +} + func TestServiceMaintenance(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/agent/agent.go b/command/agent/agent.go index 54e2b0c9ee96..f3c5461b84ca 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/coordinate" @@ -66,6 +67,9 @@ type Agent struct { // Output sink for logs logOutput io.Writer + // Used for streaming logs to + logWriter *logger.LogWriter + // We have one of a client or a server, depending // on our configuration server *consul.Server @@ -121,7 +125,7 @@ type Agent struct { // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer) (*Agent, error) { +func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr @@ -175,6 +179,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { config: config, logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, + logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), checkMonitors: make(map[types.CheckID]*CheckMonitor), checkTTLs: make(map[types.CheckID]*CheckTTL), diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index db33256af228..cb4afbde927f 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -2,12 +2,15 @@ package agent import ( "fmt" + "log" "net/http" "strconv" "strings" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" + "github.com/hashicorp/logutils" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -393,6 +396,74 @@ func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Re return nil, nil } +func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Only GET supported + if req.Method != "GET" { + resp.WriteHeader(405) + return nil, nil + } + + var args structs.DCSpecificRequest + args.Datacenter = s.agent.config.Datacenter + s.parseToken(req, &args.Token) + // Validate that the given token has operator permissions + var reply structs.RaftConfigurationResponse + if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil { + return nil, err + } + + // Get the provided loglevel + logLevel := req.URL.Query().Get("loglevel") + if logLevel == "" { + logLevel = "INFO" + } + + // Upper case the log level + logLevel = strings.ToUpper(logLevel) + + // Create a level filter + filter := logger.LevelFilter() + filter.MinLevel = logutils.LogLevel(logLevel) + if !logger.ValidateLevelFilter(filter.MinLevel, filter) { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Unknown log level: %s", filter.MinLevel))) + return nil, nil + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + // Set up a log handler + handler := &httpLogHandler{ + filter: filter, + logCh: make(chan string, 512), + logger: s.logger, + } + s.agent.logWriter.RegisterHandler(handler) + defer s.agent.logWriter.DeregisterHandler(handler) + + notify := resp.(http.CloseNotifier).CloseNotify() + + // Stream logs until the connection is closed + for { + select { + case <-notify: + s.agent.logWriter.DeregisterHandler(handler) + if handler.droppedCount > 0 { + s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount) + } + return nil, nil + case log := <-handler.logCh: + resp.Write([]byte(log + "\n")) + flusher.Flush() + } + } + + return nil, nil +} + // syncChanges is a helper function which wraps a blocking call to sync // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. @@ -401,3 +472,26 @@ func (s *HTTPServer) syncChanges() { s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } + +type httpLogHandler struct { + filter *logutils.LevelFilter + logCh chan string + logger *log.Logger + droppedCount int +} + +func (h *httpLogHandler) HandleLog(log string) { + // Check the log level + if !h.filter.Check([]byte(log)) { + return + } + + // Do a non-blocking send + select { + case h.logCh <- log: + default: + // Just increment a counter for dropped logs to this handler; we can't log now + // because the lock is already held by the LogWriter invoking this + h.droppedCount += 1 + } +} diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index a29b8ae9d2c5..1919855e562d 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -1,8 +1,11 @@ package agent import ( + "bytes" "errors" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" "os" @@ -12,6 +15,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" @@ -1019,3 +1023,99 @@ func TestHTTPAgentRegisterServiceCheck(t *testing.T) { t.Fatalf("bad: %#v", result["memcached_check2"]) } } + +func TestHTTPAgent_Monitor(t *testing.T) { + logWriter := logger.NewLogWriter(512) + expectedLogs := bytes.Buffer{} + logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) + + dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) + srv.agent.logWriter = logWriter + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Try passing an invalid log level + req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=invalid", nil) + resp := newClosableRecorder() + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 400 { + t.Fatalf("bad: %v", resp.Code) + } + body, _ := ioutil.ReadAll(resp.Body) + if !strings.Contains(string(body), "Unknown log level") { + t.Fatalf("bad: %s", body) + } + + // Begin streaming logs from the monitor endpoint + req, _ = http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + resp = newClosableRecorder() + go func() { + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %s", err) + } + }() + + // Write the incoming logs from http to a channel for comparison + logCh := make(chan string, 5) + + // Block until the first log entry from http + testutil.WaitForResult(func() (bool, error) { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + return false, fmt.Errorf("err: %v", err) + } + if line == "" { + return false, fmt.Errorf("blank line") + } + logCh <- line + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + go func() { + for { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + t.Fatalf("err: %v", err) + } + if line != "" { + logCh <- line + } + } + }() + + // Verify that the first 5 logs we get match the expected stream + for i := 0; i < 5; i++ { + select { + case log := <-logCh: + expected, err := expectedLogs.ReadString('\n') + if err != nil { + t.Fatalf("err: %v", err) + } + if log != expected { + t.Fatalf("bad: %q %q", expected, log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get log within timeout") + } + } +} + +type closableRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func newClosableRecorder() *closableRecorder { + r := httptest.NewRecorder() + closer := make(chan bool) + return &closableRecorder{r, closer} +} + +func (r *closableRecorder) CloseNotify() <-chan bool { + return r.closer +} diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 92740f352099..42aed2f8f52e 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/raft" ) @@ -79,14 +80,14 @@ func nextConfig() *Config { return conf } -func makeAgentLog(t *testing.T, conf *Config, l io.Writer) (string, *Agent) { +func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) { dir, err := ioutil.TempDir("", "agent") if err != nil { t.Fatalf(fmt.Sprintf("err: %v", err)) } conf.DataDir = dir - agent, err := Create(conf, l) + agent, err := Create(conf, l, writer) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) @@ -112,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { t.Fatalf("err: %s", err) } - agent, err := Create(conf, nil) + agent, err := Create(conf, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -121,7 +122,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { } func makeAgent(t *testing.T, conf *Config) (string, *Agent) { - return makeAgentLog(t, conf, nil) + return makeAgentLog(t, conf, nil, nil) } func externalIP() (string, error) { @@ -845,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -979,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } config.Services = []*ServiceDefinition{svc2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1072,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1165,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } config.Checks = []*CheckDefinition{check2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/command.go b/command/agent/command.go index 247ffd1b9038..d47537363a67 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -466,7 +466,7 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) { // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { c.Ui.Output("Starting Consul agent...") - agent, err := Create(config, logOutput) + agent, err := Create(config, logOutput, logWriter) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err diff --git a/command/agent/http.go b/command/agent/http.go index 082fc039b54e..cf1e1ab82dc0 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -251,6 +251,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { } s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7cc80cb240c7..b02b65970527 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/go-cleanhttp" ) @@ -28,6 +29,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { + return makeHTTPServerWithConfigLog(t, cb, nil, nil) +} + +func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { configTry := 0 RECONF: configTry += 1 @@ -36,7 +41,7 @@ RECONF: cb(conf) } - dir, agent := makeAgent(t, conf) + dir, agent := makeAgentLog(t, conf, l, logWriter) servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { if configTry < 3 { diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go index a8f473f767cd..a6feb12f517c 100644 --- a/command/agent/rpc_client_test.go +++ b/command/agent/rpc_client_test.go @@ -61,7 +61,7 @@ RECONF: t.Fatalf("err: %s", err) } - dir, agent := makeAgentLog(t, conf, mult) + dir, agent := makeAgentLog(t, conf, mult, lw) rpc := NewAgentRPC(agent, l, mult, lw) rpcClient, err := NewRPCClient(l.Addr().String()) diff --git a/command/util_test.go b/command/util_test.go index 8260a42173d3..1a4a1e892759 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { } conf.DataDir = dir - a, err := agent.Create(conf, lw) + a, err := agent.Create(conf, lw, nil) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index 74f3752ca99a..6a9a9ce7682b 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -21,6 +21,7 @@ The following endpoints are supported: * [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent * [`/v1/agent/self`](#agent_self) : Returns the local node configuration * [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode +* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent * [`/v1/agent/join/
`](#agent_join) : Triggers the local agent to join a node * [`/v1/agent/force-leave/`](#agent_force_leave): Forces removal of a node * [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check @@ -211,6 +212,17 @@ to aid human operators. If no reason is provided, a default value will be used i The return code is 200 on success. +### /v1/agent/monitor + +Added in Consul 0.7.2, This endpoint is hit with a GET and will stream logs from the +local agent until the connection is closed. + +The `?loglevel` flag is optional. If provided, its value should be a text string +containing a log level to filter on, such as `info`. If no loglevel is provided, +`info` will be used as a default. + +The return code is 200 on success. + ### /v1/agent/join/\ This endpoint is hit with a GET and is used to instruct the agent to attempt to @@ -403,7 +415,7 @@ body must look like: ], "Address": "127.0.0.1", "Port": 8000, - "EnableTagOverride": false, + "EnableTagOverride": false, "Check": { "DeregisterCriticalServiceAfter": "90m", "Script": "/usr/local/bin/check_redis.py",