diff --git a/cmd/promtail/main.go b/cmd/promtail/main.go index fac17045e319d..8253864b746d1 100644 --- a/cmd/promtail/main.go +++ b/cmd/promtail/main.go @@ -12,30 +12,41 @@ import ( "github.com/prometheus/common/version" "github.com/weaveworks/common/logging" - "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/promtail" "github.com/grafana/loki/pkg/promtail/config" ) +var ( + configFile = "cmd/promtail/promtail-local-config.yaml" + cfg config.Config +) + func init() { prometheus.MustRegister(version.NewCollector("promtail")) } func main() { - var ( - configFile = "cmd/promtail/promtail-local-config.yaml" - config config.Config - ) flag.StringVar(&configFile, "config.file", "promtail.yml", "The config file.") - flagext.RegisterFlags(&config) + flagext.RegisterFlags(&cfg) flag.Parse() - util.InitLogger(&config.ServerConfig.Config) + util.InitLogger(&cfg.ServerConfig.Config) if configFile != "" { - if err := helpers.LoadConfig(configFile, &config); err != nil { - level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err) - os.Exit(1) + errChan := make(chan error, 1) + + m, err := promtail.InitMaster(configFile, cfg) + if err != nil { + errChan <- err + } else { + // Re-init the logger which will now honor a different log level set in ServerConfig.Config + util.InitLogger(&m.Promtail.Cfg.ServerConfig.Config) + if err := m.Promtail.Run(); err != nil { + level.Error(util.Logger).Log("msg", "error running promtail", "error", err) + errChan <- err + } + m.Promtail.Shutdown() + m.Cancel <- struct{}{} } } @@ -46,18 +57,11 @@ func main() { } util.InitLogger(&config.ServerConfig.Config) - p, err := promtail.New(config) - if err != nil { - level.Error(util.Logger).Log("msg", "error creating promtail", "error", err) + <-errChan + m.Promtail.Shutdown() os.Exit(1) - } - - level.Info(util.Logger).Log("msg", "Starting Promtail", "version", version.Info()) - - if err := p.Run(); err != nil { - level.Error(util.Logger).Log("msg", "error starting promtail", "error", err) + } else { + level.Error(util.Logger).Log("msg", "config file not found", "error", nil) os.Exit(1) } - - p.Shutdown() } diff --git a/pkg/promtail/promtail-test-config.yaml b/pkg/promtail/promtail-test-config.yaml new file mode 100644 index 0000000000000..bad8a5450db57 --- /dev/null +++ b/pkg/promtail/promtail-test-config.yaml @@ -0,0 +1,32 @@ +server: + http_listen_host: localhost + http_listen_port: 9080 + grpc_listen_host: localhost + +client: + url: http://localhost:3100/api/prom/push + batchwait: 10ms + batchsize: 10240 + backoff_config: + minbackoff: 100ms + maxbackoff: 5s + maxretries: 5 + +positions: + sync_period: 100ms + filename: positionsFileName + +target_config: + # Make sure the SyncPeriod is fast for test purposes, but not faster than the poll interval (250ms) + # to avoid a race between the sync() function and the tailers noticing when files are deleted + sync_period: 500ms + +scrape_configs: + - job_name: system + entry_parser: raw + static_configs: + - targets: + - localhost + labels: + job: varlogs + __path__: varLogsPath diff --git a/pkg/promtail/promtail.go b/pkg/promtail/promtail.go index 3fae7132c7083..26f1a0f8afab1 100644 --- a/pkg/promtail/promtail.go +++ b/pkg/promtail/promtail.go @@ -1,8 +1,15 @@ package promtail import ( - "github.com/cortexproject/cortex/pkg/util" + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/config" "github.com/grafana/loki/pkg/promtail/positions" @@ -12,14 +19,105 @@ import ( // Promtail is the root struct for Promtail... type Promtail struct { - client client.Client - positions *positions.Positions - targetManagers *targets.TargetManagers - server *server.Server + Client client.Client + Positions *positions.Positions + TargetManagers *targets.TargetManagers + Server *server.Server + + configFile string + Cfg *config.Config +} + +type Master struct { + Promtail *Promtail + defaultCfg *config.Config + Cancel chan struct{} +} + +func InitMaster(configFile string, cfg config.Config) (*Master, error) { + p, err := New(configFile, cfg) + if err != nil { + //level.Error(util.Logger).Log("msg", "error creating promtail", "error", err) + return nil, err + } + + cancel := make(chan struct{}) + + m := &Master{ + Promtail: p, + defaultCfg: &cfg, + Cancel: cancel, + } + + p.Server.HTTP.Path("/reload").Handler(http.HandlerFunc(m.Reload)) + + return m, nil +} + +func (m *Master) Reload(rw http.ResponseWriter, _ *http.Request) { + go m.DoReload() + rw.WriteHeader(http.StatusNoContent) +} + +func (m *Master) DoReload() { + errChan := make(chan error, 1) + + level.Info(util.Logger).Log("msg", "=== received RELOAD ===\n*** reloading") + // trigger server shutdown + m.Promtail.Server.Stop() + + // wait old promtail shutdown + <-m.Cancel + + p, err := New(m.Promtail.configFile, *m.defaultCfg) + if err != nil { + level.Error(util.Logger).Log("msg", "error reloading new promtail", "error", err) + errChan <- err + } + // Re-init the logger which will now honor a different log level set in ServerConfig.Config + util.InitLogger(&m.Promtail.Cfg.ServerConfig.Config) + + p.Server.HTTP.Path("/reload").Handler(http.HandlerFunc(m.Reload)) + + m.Promtail = p + + err = m.Promtail.Run() + if err != nil { + level.Error(util.Logger).Log("msg", "error starting new promtail", "error", err) + errChan <- err + } + + m.WaitSignals(errChan) } -// New makes a new Promtail. -func New(cfg config.Config) (*Promtail, error) { +func (m *Master) WaitSignals(errChan chan error) { + // can not directly use m.promtail.Run() to handler signals since reload call Shutdown() too + // avoid close of closed channel panic + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + buf := make([]byte, 1<<20) + select { + case <-errChan: + os.Exit(1) + case sig := <-sigs: + switch sig { + case syscall.SIGINT, syscall.SIGTERM: + level.Info(util.Logger).Log("msg", "=== received SIGINT/SIGTERM ===\n*** exiting") + m.Promtail.Shutdown() + return + case syscall.SIGQUIT: + stacklen := runtime.Stack(buf, true) + level.Info(util.Logger).Log("msg", "=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end", buf[:stacklen]) + } + } +} + +// New promtail from config file +func New(configFile string, cfg config.Config) (*Promtail, error) { + if err := helpers.LoadConfig(configFile, &cfg); err != nil { + return nil, err + } + positions, err := positions.New(util.Logger, cfg.PositionsConfig) if err != nil { return nil, err @@ -46,22 +144,24 @@ func New(cfg config.Config) (*Promtail, error) { } return &Promtail{ - client: client, - positions: positions, - targetManagers: tms, - server: server, + Client: client, + Positions: positions, + TargetManagers: tms, + Server: server, + configFile: configFile, + Cfg: &cfg, }, nil } // Run the promtail; will block until a signal is received. func (p *Promtail) Run() error { - return p.server.Run() + return p.Server.Run() } // Shutdown the promtail. func (p *Promtail) Shutdown() { - p.server.Shutdown() - p.targetManagers.Stop() - p.positions.Stop() - p.client.Stop() + p.Server.Shutdown() + p.TargetManagers.Stop() + p.Positions.Stop() + p.Client.Stop() } diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 836300fd56b1c..8c8eeac33db5a 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -17,18 +18,13 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/common/model" - sd_config "github.com/prometheus/prometheus/discovery/config" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/config" - "github.com/grafana/loki/pkg/promtail/scrape" "github.com/grafana/loki/pkg/promtail/targets" ) @@ -79,15 +75,24 @@ func TestPromtail(t *testing.T) { }() // Run. + configFile, err := buildTestConfig(t, positionsFileName, testDir) + if err != nil { + t.Error("error creating configfile", err) + return + } - p, err := New(buildTestConfig(t, positionsFileName, testDir)) + cfg := config.Config{} + // config with default values + flagext.RegisterFlags(&cfg) + + m, err := InitMaster(configFile, cfg) if err != nil { t.Error("error creating promtail", err) return } go func() { - err = p.Run() + err = m.Promtail.Run() if err != nil { err = errors.Wrap(err, "Failed to start promtail") } @@ -138,7 +143,16 @@ func TestPromtail(t *testing.T) { //Pull out some prometheus metrics before shutting down metricsBytes, contentType := getPromMetrics(t) - p.Shutdown() + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/reload", httpTestPort)) + if err != nil { + t.Fatal("Could not touch reload endpoint", err) + } + + if resp.StatusCode != http.StatusNoContent { + t.Fatal("Received a non 204 status code from /reload endpoint", resp.StatusCode) + } + + m.Promtail.Shutdown() // Verify. @@ -419,7 +433,6 @@ func getPromMetrics(t *testing.T) ([]byte, string) { func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 { rb := map[string]float64{} - pr := textparse.New(bytes, contentType) for { et, err := pr.Next() @@ -448,60 +461,30 @@ func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName return rb } -func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) config.Config { - var clientURL flagext.URLValue - err := clientURL.Set("http://localhost:3100/api/prom/push") +func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) (string, error) { + configFile := "./promtail-test-config.yaml" + input, err := ioutil.ReadFile(configFile) if err != nil { - t.Fatal("Failed to parse client URL") - } - - cfg := config.Config{} - // Init everything with default values. - flagext.RegisterFlags(&cfg) - - // Make promtail listen on localhost to avoid prompts on MacOS. - cfg.ServerConfig.HTTPListenHost = "localhost" - cfg.ServerConfig.HTTPListenPort = httpTestPort - cfg.ServerConfig.GRPCListenHost = "localhost" - - // Override some of those defaults - cfg.ClientConfig.URL = clientURL - cfg.ClientConfig.BatchWait = 10 * time.Millisecond - cfg.ClientConfig.BatchSize = 10 * 1024 - - cfg.PositionsConfig.SyncPeriod = 100 * time.Millisecond - cfg.PositionsConfig.PositionsFile = positionsFileName - - targetGroup := targetgroup.Group{ - Targets: []model.LabelSet{{ - "localhost": "", - }}, - Labels: model.LabelSet{ - "job": "varlogs", - "__path__": model.LabelValue(logDirName + "/**/*.log"), - }, - Source: "", + t.Fatal(err) } - serviceConfig := sd_config.ServiceDiscoveryConfig{ - StaticConfigs: []*targetgroup.Group{ - &targetGroup, - }, + lines := strings.Split(string(input), "\n") + for i, line := range lines { + if strings.Contains(line, "varLogsPath") { + lines[i] = strings.Replace(line, "varLogsPath", logDirName+"/**/*.log", 1) + } + if strings.Contains(line, "positionsFileName") { + lines[i] = strings.Replace(line, "positionsFileName", positionsFileName, 1) + } } - scrapeConfig := scrape.Config{ - JobName: "", - EntryParser: api.Raw, - RelabelConfigs: nil, - ServiceDiscoveryConfig: serviceConfig, + output := strings.Join(lines, "\n") + err = ioutil.WriteFile(configFile, []byte(output), 0644) + if err != nil { + return "", err } - cfg.ScrapeConfig = append(cfg.ScrapeConfig, scrapeConfig) - - // Make sure the SyncPeriod is fast for test purposes, but not faster than the poll interval (250ms) - // to avoid a race between the sync() function and the tailers noticing when files are deleted - cfg.TargetConfig.SyncPeriod = 500 * time.Millisecond - return cfg + return configFile, err } func initRandom() { diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index f2081fb630d71..7d990fd7e516d 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -108,6 +108,7 @@ func New(cfg Config) (*Server, error) { Help: "Time (in seconds) spent serving HTTP requests.", Buckets: instrument.DefBuckets, }, []string{"method", "route", "status_code", "ws"}) + prometheus.Unregister(requestDuration) prometheus.MustRegister(requestDuration) // If user doesn't supply a logging implementation, by default instantiate