diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 93740e63c93..53ed2196781 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -54,6 +54,9 @@ const ( // MetaFileExtension is the file extension for the log files of new fields and measurements that get created MetaFileExtension = "meta" + // CompactionExtension is the file extension we expect for compaction files + CompactionExtension = "CPT" + // MetaFlushInterval is the period after which any compressed meta data in the .meta file will get // flushed to the index MetaFlushInterval = 10 * time.Minute @@ -86,9 +89,6 @@ var ( // ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one ErrCompactionRunning = errors.New("compaction running") - // ErrCompactionBlock gets returned if we're reading a compressed block and its a file compaction description - ErrCompactionBlock = errors.New("compaction description") - // ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory // but a compaction has already been done to do so ErrMemoryCompactionDone = errors.New("compaction already run to free up memory") @@ -339,7 +339,7 @@ func (l *Log) DeleteSeries(keys []string) error { // seriesAndFields objects that were written in. It ignores file errors since those can't be // recovered. func (l *Log) readMetadataFile(fileName string) ([]*seriesAndFields, error) { - f, err := os.OpenFile(fileName, os.O_RDONLY, 0666) + f, err := os.OpenFile(fileName, os.O_RDWR, 0666) if err != nil { return nil, err } @@ -492,6 +492,12 @@ func (l *Log) openPartitionFiles() error { for _, p := range l.partitions { go func(p *Partition) { + // Recover from a partial compaction. + if err := p.recoverCompactionFile(); err != nil { + results <- fmt.Errorf("recover compaction files: %s", err) + return + } + fileNames, err := p.segmentFileNames() if err != nil { results <- err @@ -651,12 +657,12 @@ func (l *Log) partition(key []byte) *Partition { id := uint8(h.Sum64()%l.partitionCount + 1) p := l.partitions[id] if p == nil { - if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil { + p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index) + if err != nil { panic(err) - } else { - p.enableLogging = l.EnableLogging - l.partitions[id] = p } + p.enableLogging = l.EnableLogging + l.partitions[id] = p } return p } @@ -699,10 +705,17 @@ type Partition struct { lastWriteTime time.Time enableLogging bool + + // Used for mocking OS calls + os struct { + OpenCompactionFile func(name string, flag int, perm os.FileMode) (file *os.File, err error) + OpenSegmentFile func(name string, flag int, perm os.FileMode) (file *os.File, err error) + Rename func(oldpath, newpath string) error + } } func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) { - return &Partition{ + p := &Partition{ id: id, path: path, maxSegmentSize: segmentSize, @@ -712,7 +725,13 @@ func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64 readySeriesSize: readySeriesSize, index: index, flushColdInterval: flushColdInterval, - }, nil + } + + p.os.OpenCompactionFile = os.OpenFile + p.os.OpenSegmentFile = os.OpenFile + p.os.Rename = os.Rename + + return p, nil } // Close resets the caches and closes the currently open segment file @@ -798,7 +817,6 @@ func (p *Partition) newSegmentFile() error { } fileName := p.fileNameForSegment(p.currentSegmentID) - ff, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err @@ -816,7 +834,7 @@ func (p *Partition) fileNameForSegment(id uint32) string { // compactionFileName is the name of the temporary file used for compaction func (p *Partition) compactionFileName() string { - return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.CPT", p.id, 1)) + return filepath.Join(p.path, fmt.Sprintf("%02d.%06d.%s", p.id, 1, CompactionExtension)) } // fileIDFromName will return the segment ID from the file name @@ -1012,7 +1030,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { } // all compacted data from the segments will go into this file - compactionFile, err := os.OpenFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666) + compactionFile, err := p.os.OpenCompactionFile(p.compactionFileName(), os.O_CREATE|os.O_RDWR, 0666) if err != nil { return err } @@ -1028,7 +1046,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { break } - f, err := os.OpenFile(n, os.O_RDONLY, 0666) + f, err := p.os.OpenSegmentFile(n, os.O_RDONLY, 0666) if err != nil { return err } @@ -1036,15 +1054,12 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { sf := newSegment(f) var entries []*entry for { - a, err := sf.readCompressedBlock() - - if err == ErrCompactionBlock { - continue + name, a, err := sf.readCompressedBlock() + if name != "" { + continue // skip name blocks } else if err != nil { return err - } - - if a == nil { + } else if a == nil { break } @@ -1056,7 +1071,7 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { } } - if err := p.writeCompactionEntry(compactionFile, entries); err != nil { + if err := p.writeCompactionEntry(compactionFile, f.Name(), entries); err != nil { return err } @@ -1080,13 +1095,13 @@ func (p *Partition) compactFiles(c *compactionInfo, flush flushType) error { return os.Remove(compactionFile.Name()) } - return os.Rename(compactionFile.Name(), p.fileNameForSegment(1)) + return p.os.Rename(compactionFile.Name(), p.fileNameForSegment(1)) } // writeCompactionEntry will write a marker for the beginning of the file we're compacting, a compressed block // for all entries, then a marker for the end of the file -func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error { - if err := p.writeCompactionFileName(f); err != nil { +func (p *Partition) writeCompactionEntry(f *os.File, filename string, entries []*entry) error { + if err := p.writeCompactionFileName(f, filename); err != nil { return err } @@ -1094,8 +1109,8 @@ func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error { for _, e := range entries { block = append(block, marshalWALEntry(e.key, e.timestamp, e.data)...) } - b := snappy.Encode(nil, block) + b := snappy.Encode(nil, block) if _, err := f.Write(u64tob(uint64(len(b)))); err != nil { return err } @@ -1104,17 +1119,12 @@ func (p *Partition) writeCompactionEntry(f *os.File, entries []*entry) error { return err } - if err := p.writeCompactionFileName(f); err != nil { - return err - } - return f.Sync() } // writeCompactionFileName will write a compaction log length entry and the name of the file that is compacted -func (p *Partition) writeCompactionFileName(f *os.File) error { - name := []byte(f.Name()) - length := u64tob(uint64(len(name))) +func (p *Partition) writeCompactionFileName(f *os.File, filename string) error { + length := u64tob(uint64(len([]byte(filename)))) // the beginning of the length has two bytes to indicate that this is a compaction log entry length[0] = 0xFF @@ -1124,13 +1134,86 @@ func (p *Partition) writeCompactionFileName(f *os.File) error { return err } - if _, err := f.Write(name); err != nil { + if _, err := f.Write([]byte(filename)); err != nil { return err } return nil } +// recoverCompactionFile iterates over all compaction files in a directory and +// cleans them and removes undeleted files. +func (p *Partition) recoverCompactionFile() error { + path := p.compactionFileName() + + // Open compaction file. Ignore if it doesn't exist. + f, err := p.os.OpenCompactionFile(path, os.O_RDWR, 0666) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + defer f.Close() + + // Iterate through all named blocks. + sf := newSegment(f) + var hasData bool + for { + // Only read named blocks. + name, a, err := sf.readCompressedBlock() + if err != nil { + return fmt.Errorf("read name block: %s", err) + } else if name == "" && a == nil { + break // eof + } else if name == "" { + continue // skip unnamed blocks + } + + // Read data for the named block. + if s, entries, err := sf.readCompressedBlock(); err != nil { + return fmt.Errorf("read data block: %s", err) + } else if s != "" { + return fmt.Errorf("unexpected double name block") + } else if entries == nil { + break // eof + } + + // If data exists then ensure the underlying segment is deleted. + if err := os.Remove(name); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove segment: filename=%s, err=%s", name, err) + } + + // Flag the compaction file as having data and it should be renamed. + hasData = true + } + f.Close() + + // If the compaction file did not have at least one named block written to + // it then it should removed. This check is performed to ensure a partial + // compaction file does not overwrite an original segment file. + if !hasData { + if err := os.Remove(path); err != nil { + return fmt.Errorf("remove compaction file: %s", err) + } + return nil + } + + // Double check that we are not renaming the compaction file over an + // existing segment file. The segment file should be removed in the + // recovery process but this simply double checks that removal occurred. + newpath := p.fileNameForSegment(1) + if _, err := os.Stat(newpath); !os.IsNotExist(err) { + return fmt.Errorf("cannot rename compaction file, segment exists: filename=%s", newpath) + } + + // Rename compaction file to the first segment file. + if err := p.os.Rename(path, newpath); err != nil { + return fmt.Errorf("rename compaction file: %s", err) + } + + return nil +} + // readFile will read a segment file and marshal its entries into the cache func (p *Partition) readFile(path string) (entries []*entry, err error) { id, err := p.fileIDFromName(path) @@ -1145,10 +1228,9 @@ func (p *Partition) readFile(path string) (entries []*entry, err error) { sf := newSegment(f) for { - a, err := sf.readCompressedBlock() - - if err == ErrCompactionBlock { - continue + name, a, err := sf.readCompressedBlock() + if name != "" { + continue // skip name blocks } else if err != nil { f.Close() return nil, err @@ -1300,31 +1382,27 @@ func newSegment(f *os.File) *segment { // readCompressedBlock will read the next compressed block from the file and marshal the entries. // if we've hit the end of the file or corruption the entry array will be nil -func (s *segment) readCompressedBlock() (entries []*entry, err error) { +func (s *segment) readCompressedBlock() (name string, entries []*entry, err error) { blockSize := int64(0) n, err := s.f.Read(s.length) if err == io.EOF { - return nil, nil + return "", nil, nil } else if err != nil { - return nil, err - } - blockSize += int64(n) - - if n != len(s.length) { - log.Println("unable to read the size of a data block from file: ", s.f.Name()) + return "", nil, fmt.Errorf("read length: %s", err) + } else if n != len(s.length) { // seek back before this length so we can start overwriting the file from here + log.Println("unable to read the size of a data block from file: ", s.f.Name()) s.f.Seek(-int64(n), 1) - return nil, nil + return "", nil, nil } + blockSize += int64(n) // Compacted WAL files will have a magic byte sequence that indicate the next part is a file name // instead of a compressed block. We can ignore these bytes and the ensuing file name to get to the next block. - isCompactionFileNameBlock := false - if bytes.Compare(s.length[0:2], CompactSequence) == 0 { - s.length[0] = 0x00 - s.length[1] = 0x00 - isCompactionFileNameBlock = true + isCompactionFileNameBlock := bytes.Equal(s.length[0:2], CompactSequence) + if isCompactionFileNameBlock { + s.length[0], s.length[1] = 0x00, 0x00 } dataLength := btou64(s.length) @@ -1332,7 +1410,7 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) { // make sure we haven't hit the end of data. trailing end of file can be zero bytes if dataLength == 0 { s.f.Seek(-int64(len(s.length)), 1) - return nil, nil + return "", nil, nil } if len(s.block) < int(dataLength) { @@ -1341,7 +1419,7 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) { n, err = s.f.Read(s.block[:dataLength]) if err != nil { - return nil, err + return "", nil, fmt.Errorf("read block: %s", err) } blockSize += int64(n) @@ -1353,34 +1431,34 @@ func (s *segment) readCompressedBlock() (entries []*entry, err error) { // seek back to before this block and its size so we can overwrite the corrupt data s.f.Seek(-int64(len(s.length)+n), 1) if err := s.f.Truncate(s.size); err != nil { - return nil, err + return "", nil, fmt.Errorf("truncate(0): sz=%d, err=%s", s.size, err) } - return nil, nil + return "", nil, nil } // skip the rest if this is just the filename from a compaction if isCompactionFileNameBlock { - return nil, ErrCompactionBlock + return string(s.block[:dataLength]), nil, nil } - buf, err := snappy.Decode(nil, s.block[:dataLength]) - // if there was an error decoding, this is a corrupt block so we zero out the rest of the file + buf, err := snappy.Decode(nil, s.block[:dataLength]) if err != nil { log.Println("corrupt compressed block in file: ", err.Error(), s.f.Name()) // go back to the start of this block and zero out the rest of the file s.f.Seek(-int64(len(s.length)+n), 1) if err := s.f.Truncate(s.size); err != nil { - return nil, err + return "", nil, fmt.Errorf("truncate(1): sz=%d, err=%s", s.size, err) } - return nil, nil + return "", nil, nil } // read in the individual data points from the decompressed wal block bytesRead := 0 + entries = make([]*entry, 0) for { if bytesRead >= len(buf) { break diff --git a/tsdb/engine/wal/wal_test.go b/tsdb/engine/wal/wal_test.go index 4d2538d2afb..0dea03f08ba 100644 --- a/tsdb/engine/wal/wal_test.go +++ b/tsdb/engine/wal/wal_test.go @@ -3,10 +3,12 @@ package wal import ( "bytes" "encoding/binary" + "errors" "fmt" "io/ioutil" "math/rand" "os" + "path/filepath" "reflect" "testing" "time" @@ -685,6 +687,91 @@ func TestWAL_DeleteSeries(t *testing.T) { } } +// Ensure a partial compaction can be recovered from. +func TestWAL_Compact_Recovery(t *testing.T) { + log := openTestWAL() + log.partitionCount = 1 + log.CompactionThreshold = 0.7 + log.ReadySeriesSize = 1024 + log.flushCheckInterval = time.Minute + defer log.Close() + defer os.RemoveAll(log.path) + + points := make([]map[string][][]byte, 0) + log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { + points = append(points, pointsByKey) + return nil + }} + + if err := log.Open(); err != nil { + t.Fatalf("couldn't open wal: %s", err.Error()) + } + + // Retrieve partition. + p := log.partitions[1] + + codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{ + "value": { + ID: uint8(1), + Name: "value", + Type: influxql.Float, + }, + }) + + b := make([]byte, 70*5000) + for i := 1; i <= 100; i++ { + buf := bytes.NewBuffer(b) + for j := 1; j <= 1000; j++ { + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", j, rand.Float64(), i)) + } + buf.WriteString(fmt.Sprintf("cpu,host=A,region=uswest%d value=%.3f %d\n", rand.Int(), rand.Float64(), i)) + + // Write the batch out. + if err := log.WritePoints(parsePoints(buf.String(), codec), nil, nil); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + } + + // Mock second open call to fail. + p.os.OpenSegmentFile = func(name string, flag int, perm os.FileMode) (file *os.File, err error) { + if filepath.Base(name) == "01.000001.wal" { + return os.OpenFile(name, flag, perm) + } + return nil, errors.New("marker") + } + if err := p.flushAndCompact(thresholdFlush); err == nil || err.Error() != "marker" { + t.Fatalf("unexpected flush error: %s", err) + } + p.os.OpenSegmentFile = os.OpenFile + + // Append second file to simulate partial write. + func() { + f, err := os.OpenFile(p.compactionFileName(), os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + // Append filename and partial data. + if err := p.writeCompactionEntry(f, "01.000002.wal", []*entry{{key: []byte("foo"), data: []byte("bar"), timestamp: 100}}); err != nil { + t.Fatal(err) + } + + // Truncate by a few bytes. + if fi, err := f.Stat(); err != nil { + t.Fatal(err) + } else if err = f.Truncate(fi.Size() - 2); err != nil { + t.Fatal(err) + } + }() + + // Now close and re-open the wal and ensure there are no errors. + log.Close() + if err := log.Open(); err != nil { + t.Fatalf("unexpected open error: %s", err) + } +} + // test that partitions get compacted and flushed when number of series hits compaction threshold // test that partitions get compacted and flushed when a single series hits the compaction threshold // test that writes slow down when the partition size threshold is hit