Skip to content

Commit

Permalink
receiver: Avoid flushing the WAL twice on startup (#1606)
Browse files Browse the repository at this point in the history
* Avoid flushing the WAL twice when starting the receiver. Instead we wait
for the hashring to be loaded which will force a WAL flush. When
stopping the receiver we also stop the DB before flushing to
avoid re-opening the DB unnecessarily.

Signed-off-by: Devin Trejo <dtrejo@palantir.com>

* Trailing period.

Signed-off-by: Devin Trejo <dtrejo@palantir.com>

* Revert previous change where we closed the db before flushing. Move defintion of the db inside of the rungroup.

Signed-off-by: Devin Trejo <dtrejo@palantir.com>
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
dtrejod authored and GiedriusS committed Oct 28, 2019
1 parent a6cceb8 commit 41c2deb
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@ func runReceive(
MaxBlockDuration: tsdbBlockDuration,
WALCompression: true,
}
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

localStorage := &tsdb.ReadyStorage{}
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Expand Down Expand Up @@ -195,19 +189,25 @@ func runReceive(
{
// TSDB.
cancel := make(chan struct{})
// Before actually starting, we need to make sure
// the WAL is flushed.
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring ring is loaded.
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
Expand Down

0 comments on commit 41c2deb

Please sign in to comment.