Skip to content

Commit

Permalink
Clean up abandoned WAL directories (#304)
Browse files Browse the repository at this point in the history
Remove WAL directories that are no longer associated with a `ManagedInstance`
that this agent is responsible for. The storage directories used by active
instances are compared to all subdirectories under the `wal_directory` root.
Any that are not associated with an instance and that haven't been modified
in over a configured amount of time are removed.

How often the check is run and the cutoff for how recently a WAL must have
been modified are controlled by the `wal_cleanup_period` and
`wal_cleanup_age` settings respectively, under the top-level Prometheus
configuration. When omitted, they default to a period of 30 minutes and
recently modified threshold of 12 hours.

Fixes #132
  • Loading branch information
56quarters authored Jan 8, 2021
1 parent ef2286f commit 2e5b06a
Show file tree
Hide file tree
Showing 13 changed files with 494 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ this platform. FreeBSD builds will return in a future release.
file. This was prevented by accidentally writing to a readonly volume mount.
(@rfratto)

- [ENHANCEMENT] `wal_cleanup_age` and `wal_cleanup_period` have been added to the
top-level Prometheus configuration section. These settings control how Write Ahead
Logs (WALs) that are not associated with any instances are cleaned up. By default,
WALs not associated with an instance that have not been written in the last 12 hours
are eligible to be cleaned up. This cleanup can be disabled by setting `wal_cleanup_period`
to `0`. (#304) (@56quarters)

# v0.9.1 (2021-01-04)

NOTE: FreeBSD builds will not be included for this release. There is a bug in an
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ define one instance.
# Configure the directory used by instances to store their WAL.
[wal_directory: <string> | default = ""]
# Configures how long ago an abandoned (not associated with an instance) WAL
# may be written to before being eligible to be deleted
[wal_cleanup_age: <duration> | default = "12h"]
# Configures how often checks for abandoned WALs to be deleted are performed.
# A value of 0 disables periodic cleanup of abandoned WALs
[wal_cleanup_period: <duration> | default = "30m"]
# The list of Prometheus instances to launch with the agent.
configs:
[- <prometheus_instance_config>]
Expand Down
20 changes: 19 additions & 1 deletion pkg/prom/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
DefaultConfig = Config{
Global: config.DefaultGlobalConfig,
InstanceRestartBackoff: instance.DefaultBasicManagerConfig.InstanceRestartBackoff,
WALCleanupAge: DefaultCleanupAge,
WALCleanupPeriod: DefaultCleanupPeriod,
ServiceConfig: ha.DefaultConfig,
ServiceClientConfig: client.DefaultConfig,
InstanceMode: DefaultInstanceMode,
Expand Down Expand Up @@ -70,6 +72,8 @@ type Config struct {

Global config.GlobalConfig `yaml:"global"`
WALDir string `yaml:"wal_directory"`
WALCleanupAge time.Duration `yaml:"wal_cleanup_age"`
WALCleanupPeriod time.Duration `yaml:"wal_cleanup_period"`
ServiceConfig ha.Config `yaml:"scraping_service"`
ServiceClientConfig client.Config `yaml:"scraping_service_client"`
Configs []instance.Config `yaml:"configs,omitempty"`
Expand Down Expand Up @@ -126,6 +130,8 @@ func (c *Config) ApplyDefaults() error {
// RegisterFlags defines flags corresponding to the Config.
func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.WALDir, "prometheus.wal-directory", "", "base directory to store the WAL in")
f.DurationVar(&c.WALCleanupAge, "prometheus.wal-cleanup-age", DefaultConfig.WALCleanupAge, "remove abandoned (unused) WALs older than this")
f.DurationVar(&c.WALCleanupPeriod, "prometheus.wal-cleanup-period", DefaultConfig.WALCleanupPeriod, "how often to check for abandoned WALs")
f.DurationVar(&c.InstanceRestartBackoff, "prometheus.instance-restart-backoff", DefaultConfig.InstanceRestartBackoff, "how long to wait before restarting a failed Prometheus instance")

c.ServiceConfig.RegisterFlagsWithPrefix("prometheus.service.", f)
Expand All @@ -141,7 +147,8 @@ type Agent struct {
logger log.Logger
reg prometheus.Registerer

cm instance.Manager
cm instance.Manager
cleaner *WALCleaner

instanceFactory instanceFactory

Expand Down Expand Up @@ -173,6 +180,16 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins
// collect metrics on the number of active configs.
a.cm = instance.NewCountingManager(reg, a.cm)

// Periodically attempt to clean up WALs from instances that aren't being run by
// this agent anymore.
a.cleaner = NewWALCleaner(
a.logger,
a.cm,
cfg.WALDir,
cfg.WALCleanupAge,
cfg.WALCleanupPeriod,
)

allConfigsValid := true
for _, c := range cfg.Configs {
if err := a.cm.ApplyConfig(c); err != nil {
Expand Down Expand Up @@ -230,6 +247,7 @@ func (a *Agent) Stop() {
level.Error(a.logger).Log("msg", "failed to stop scraping service server", "err", err)
}
}
a.cleaner.Stop()
a.cm.Stop()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/prom/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ func (i *fakeInstance) TargetsActive() map[string][]*scrape.Target {
return nil
}

func (i *fakeInstance) StorageDirectory() string {
return ""
}

type fakeInstanceFactory struct {
mut sync.Mutex
mocks []*fakeInstance
Expand Down
270 changes: 270 additions & 0 deletions pkg/prom/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package prom

import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/grafana/agent/pkg/prom/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promwal "github.com/prometheus/prometheus/tsdb/wal"
)

const (
DefaultCleanupAge = 12 * time.Hour
DefaultCleanupPeriod = 30 * time.Minute
)

var (
discoveryError = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "agent_prometheus_cleaner_storage_error_total",
Help: "Errors encountered discovering local storage paths",
},
[]string{"storage"},
)

segmentError = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "agent_prometheus_cleaner_segment_error_total",
Help: "Errors encountered finding most recent WAL segments",
},
[]string{"storage"},
)

managedStorage = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "agent_prometheus_cleaner_managed_storage",
Help: "Number of storage directories associated with managed instances",
},
)

abandonedStorage = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "agent_prometheus_cleaner_abandoned_storage",
Help: "Number of storage directories not associated with any managed instance",
},
)

cleanupRunsSuccess = promauto.NewCounter(
prometheus.CounterOpts{
Name: "agent_prometheus_cleaner_success_total",
Help: "Number of successfully removed abandoned WALs",
},
)

cleanupRunsErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "agent_prometheus_cleaner_errors_total",
Help: "Number of errors removing abandoned WALs",
},
)

cleanupTimes = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "agent_prometheus_cleaner_cleanup_seconds",
Help: "Time spent performing each periodic WAL cleanup",
},
)
)

// lastModifiedFunc gets the last modified time of the most recent segment of a WAL
type lastModifiedFunc func(path string) (time.Time, error)

func lastModified(path string) (time.Time, error) {
existing, err := promwal.Open(nil, path)
if err != nil {
return time.Time{}, err
}

// We don't care if there are errors closing the abandoned WAL
defer func() { _ = existing.Close() }()

_, last, err := existing.Segments()
if err != nil {
return time.Time{}, fmt.Errorf("unable to open WAL: %w", err)
}

if last == -1 {
return time.Time{}, fmt.Errorf("unable to determine most recent segment for %s", path)
}

// full path to the most recent segment in this WAL
lastSegment := promwal.SegmentName(path, last)
segmentFile, err := os.Stat(lastSegment)
if err != nil {
return time.Time{}, fmt.Errorf("unable to determine mtime for %s segment: %w", lastSegment, err)
}

return segmentFile.ModTime(), nil
}

// WALCleaner periodically checks for Write Ahead Logs (WALs) that are not associated
// with any active instance.ManagedInstance and have not been written to in some configured
// amount of time and deletes them.
type WALCleaner struct {
logger log.Logger
instanceManager instance.Manager
walDirectory string
walLastModified lastModifiedFunc
minAge time.Duration
period time.Duration
done chan bool
}

// NewWALCleaner creates a new cleaner that looks for abandoned WALs in the given
// directory and removes them if they haven't been modified in over minAge. Starts
// a goroutine to periodically run the cleanup method in a loop
func NewWALCleaner(logger log.Logger, manager instance.Manager, walDirectory string, minAge time.Duration, period time.Duration) *WALCleaner {
c := &WALCleaner{
logger: log.With(logger, "component", "cleaner"),
instanceManager: manager,
walDirectory: filepath.Clean(walDirectory),
walLastModified: lastModified,
minAge: DefaultCleanupAge,
period: DefaultCleanupPeriod,
done: make(chan bool),
}

if minAge > 0 {
c.minAge = minAge
}

// We allow a period of 0 here because '0' means "don't run the task". This
// is handled by not running a ticker at all in the run method.
if period >= 0 {
c.period = period
}

go c.run()
return c
}

// getManagedStorage gets storage directories used for each ManagedInstance
func (c *WALCleaner) getManagedStorage(instances map[string]instance.ManagedInstance) map[string]bool {
out := make(map[string]bool)

for _, inst := range instances {
out[inst.StorageDirectory()] = true
}

return out
}

// getAllStorage gets all storage directories under walDirectory
func (c *WALCleaner) getAllStorage() []string {
var out []string

_ = filepath.Walk(c.walDirectory, func(p string, info os.FileInfo, err error) error {
if os.IsNotExist(err) {
// The root WAL directory doesn't exist. Maybe this Agent isn't responsible for any
// instances yet. Log at debug since this isn't a big deal. We'll just try to crawl
// the direction again on the next periodic run.
level.Debug(c.logger).Log("msg", "WAL storage path does not exist", "path", p, "err", err)
} else if err != nil {
// Just log any errors traversing the WAL directory. This will potentially result
// in a WAL (that has incorrect permissions or some similar problem) not being cleaned
// up. This is better than preventing *all* other WALs from being cleaned up.
discoveryError.WithLabelValues(p).Inc()
level.Warn(c.logger).Log("msg", "unable to traverse WAL storage path", "path", p, "err", err)
} else if info.IsDir() && filepath.Dir(p) == c.walDirectory {
// Single level below the root are instance storage directories (including WALs)
out = append(out, p)
}

return nil
})

return out
}

// getAbandonedStorage gets the full path of storage directories that aren't associated with
// an active instance and haven't been written to within a configured duration (usually several
// hours or more).
func (c *WALCleaner) getAbandonedStorage(all []string, managed map[string]bool, now time.Time) []string {
var out []string

for _, dir := range all {
if managed[dir] {
level.Debug(c.logger).Log("msg", "active WAL", "name", dir)
continue
}

walDir := wal.SubDirectory(dir)
mtime, err := c.walLastModified(walDir)
if err != nil {
segmentError.WithLabelValues(dir).Inc()
level.Warn(c.logger).Log("msg", "unable to find segment mtime of WAL", "name", dir, "err", err)
continue
}

diff := now.Sub(mtime)
if diff > c.minAge {
// The last segment for this WAL was modified more then $minAge (positive number of hours)
// in the past. This makes it a candidate for deletion since it's also not associated with
// any Instances this agent knows about.
out = append(out, dir)
}

level.Debug(c.logger).Log("msg", "abandoned WAL", "name", dir, "mtime", mtime, "diff", diff)
}

return out
}

// run cleans up abandoned WALs (if period != 0) in a loop periodically until stopped
func (c *WALCleaner) run() {
// A period of 0 means don't run a cleanup task
if c.period == 0 {
return
}

ticker := time.NewTicker(c.period)
defer ticker.Stop()

for {
select {
case <-c.done:
level.Debug(c.logger).Log("msg", "stopping cleaner...")
return
case <-ticker.C:
c.cleanup()
}
}
}

// cleanup removes any abandoned and unused WAL directories. Note that it shouldn't be
// necessary to call this method explicitly in most cases since it will be run periodically
// in a goroutine (started when WALCleaner is created).
func (c *WALCleaner) cleanup() {
start := time.Now()
all := c.getAllStorage()
managed := c.getManagedStorage(c.instanceManager.ListInstances())
abandoned := c.getAbandonedStorage(all, managed, time.Now())

managedStorage.Set(float64(len(managed)))
abandonedStorage.Set(float64(len(abandoned)))

for _, a := range abandoned {
level.Info(c.logger).Log("msg", "deleting abandoned WAL", "name", a)
err := os.RemoveAll(a)
if err != nil {
level.Error(c.logger).Log("msg", "failed to delete abandoned WAL", "name", a, "err", err)
cleanupRunsErrors.Inc()
} else {
cleanupRunsSuccess.Inc()
}
}

cleanupTimes.Observe(time.Since(start).Seconds())
}

// Stop the cleaner and any background tasks running
func (c *WALCleaner) Stop() {
close(c.done)
}
Loading

0 comments on commit 2e5b06a

Please sign in to comment.