diff --git a/engine/filtering_helpers.go b/engine/filtering_helpers.go index 5bdff49c957..fefa8bbe68b 100644 --- a/engine/filtering_helpers.go +++ b/engine/filtering_helpers.go @@ -23,8 +23,12 @@ func getExpressionValue(values []*parser.Value, fields []string, point *protocol case parser.ValueBool: value, _ := strconv.ParseBool(value.Name) fieldValues = append(fieldValues, &protocol.FieldValue{BoolValue: &value}) - case parser.ValueString, parser.ValueRegex: + case parser.ValueString: fieldValues = append(fieldValues, &protocol.FieldValue{StringValue: &value.Name}) + case parser.ValueRegex: + regex, _ := value.GetCompiledRegex() + regexStr := regex.String() + fieldValues = append(fieldValues, &protocol.FieldValue{StringValue: ®exStr}) case parser.ValueTableName, parser.ValueSimpleName: diff --git a/integration/data_test.go b/integration/data_test.go index edb48b2e019..9d7e379b8e9 100644 --- a/integration/data_test.go +++ b/integration/data_test.go @@ -771,6 +771,20 @@ func (self *DataTestSuite) DistinctWithLimit(c *C) (Fun, Fun) { } } +func (self *DataTestSuite) InsensitiveRegexMatching(c *C) (Fun, Fun) { + return func(client Client) { + data := `[{"name":"foo","columns":["value"],"points":[["Paul"]]}]` + client.WriteJsonData(data, c) + }, func(client Client) { + series := client.RunQuery("select * from foo where value =~ /paul/i", c, "m") + c.Assert(series, HasLen, 1) + c.Assert(series[0].Name, Equals, "foo") + maps := ToMap(series[0]) + c.Assert(maps, HasLen, 1) + c.Assert(maps[0]["value"], Equals, "Paul") + } +} + func (self *DataTestSuite) DifferentColumnsInOnePost(c *C) (Fun, Fun) { return func(client Client) { data := `[{"name":"foo","columns":["val0", "val1"],"points":[["a", 1]]},{"name":"foo","columns":["val0"],"points":[["b"]]}]` diff --git a/wal/log.go b/wal/log.go index ff2ef6e1835..20dc3143dd9 100644 --- a/wal/log.go +++ b/wal/log.go @@ -23,73 +23,88 @@ type log struct { cachedSuffix uint32 } -func newLog(file *os.File, config *configuration.Configuration) (*log, error) { - info, err := file.Stat() +func newLog(logFileName string, config *configuration.Configuration) (*log, error) { + size, err := checkAndRepairLogFile(logFileName) if err != nil { return nil, err } - - size := uint64(info.Size()) - suffixString := strings.TrimLeft(path.Base(file.Name()), "log.") + suffixString := strings.TrimLeft(path.Base(logFileName), "log.") suffix, err := strconv.ParseUint(suffixString, 10, 32) if err != nil { return nil, err } + logger.Info("Opening log file %s", logFileName) + file, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + return nil, err + } - l := &log{ + return &log{ file: file, fileSize: size, closed: false, config: config, cachedSuffix: uint32(suffix), - } + }, nil +} - return l, l.check() +func getNextHeaderFromFile(file *os.File) (int, *entryHeader, error) { + hdr := &entryHeader{} + numberOfBytes, err := hdr.Read(file) + if err == io.EOF { + return 0, nil, nil + } + return numberOfBytes, hdr, err } -func (self *log) check() error { - file, err := self.dupLogFile() +func checkAndRepairLogFile(logFileName string) (uint64, error) { + info, err := os.Stat(logFileName) if err != nil { - return err + if os.IsNotExist(err) { + return 0, nil + } + return 0, err + } + size := info.Size() + if size == 0 { + return 0, nil } - info, err := file.Stat() + file, err := os.OpenFile(logFileName, os.O_RDWR, 0) if err != nil { - return err + return 0, err } - size := info.Size() + defer file.Close() offset, err := file.Seek(0, os.SEEK_SET) if err != nil { - return err + return 0, err } for { - n, hdr, err := self.getNextHeader(file) + n, hdr, err := getNextHeaderFromFile(file) if err != nil { - return err + return 0, err } if n == 0 || hdr.length == 0 { - logger.Warn("%s was truncated to %d since the file has a zero size request", self.file.Name(), offset) - return self.file.Truncate(offset) + logger.Warn("%s was truncated to %d:%d since the file has a zero size request", logFileName, size, offset) + return uint64(offset), file.Truncate(offset) } if offset+int64(n)+int64(hdr.length) > size { // file is incomplete, truncate - logger.Warn("%s was truncated to %d since the file ends prematurely", self.file.Name(), offset) - return self.file.Truncate(offset) + logger.Warn("%s was truncated to %d since the file ends prematurely", logFileName, offset) + return uint64(offset), file.Truncate(offset) } bytes := make([]byte, hdr.length) _, err = file.Read(bytes) if err != nil { - return err + return 0, err } - // this request is invalid truncate file req := &protocol.Request{} err = req.Decode(bytes) if err != nil { - logger.Warn("%s was truncated to %d since the end of the file contains invalid data", self.file.Name(), offset) + logger.Warn("%s was truncated to %d since the end of the file contains invalid data", logFileName, offset) // truncate file and return - return self.file.Truncate(offset) + return uint64(offset), file.Truncate(offset) } - offset += int64(n) + int64(hdr.length) } } @@ -183,15 +198,6 @@ func (self *log) dupAndReplayFromOffset(shardIds []uint32, offset int64, rn uint return replayChan, stopChan } -func (self *log) getNextHeader(file *os.File) (int, *entryHeader, error) { - hdr := &entryHeader{} - numberOfBytes, err := hdr.Read(file) - if err == io.EOF { - return 0, nil, nil - } - return numberOfBytes, hdr, err -} - func (self *log) skip(file *os.File, offset int64, rn uint32) error { if offset == -1 { _, err := file.Seek(0, os.SEEK_SET) @@ -212,7 +218,7 @@ func (self *log) skipRequest(file *os.File, hdr *entryHeader) (err error) { func (self *log) skipToRequest(file *os.File, requestNumber uint32) error { for { - n, hdr, err := self.getNextHeader(file) + n, hdr, err := getNextHeaderFromFile(file) if n == 0 { // EOF return nil @@ -246,7 +252,7 @@ func (self *log) replayFromFileLocation(file *os.File, defer func() { close(replayChan) }() for { - numberOfBytes, hdr, err := self.getNextHeader(file) + numberOfBytes, hdr, err := getNextHeaderFromFile(file) if numberOfBytes == 0 { break } diff --git a/wal/log_test.go b/wal/log_test.go new file mode 100644 index 00000000000..78eb1829cea --- /dev/null +++ b/wal/log_test.go @@ -0,0 +1,90 @@ +package wal + +import ( + "os" + "path/filepath" + + . "launchpad.net/gocheck" + + "github.com/influxdb/influxdb/protocol" +) + +func (_ *WalSuite) TestCheckLogFileIsEmpty(c *C) { + logfile := filepath.Join(c.MkDir(), "log.1") + f, e := os.OpenFile(logfile, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) + c.Assert(e, IsNil) + f.Close() + + _, e = checkAndRepairLogFile(logfile) + c.Assert(e, IsNil) +} + +func (_ *WalSuite) TestCheckLogFileIsOK(c *C) { + logfile := filepath.Join(c.MkDir(), "log.1") + f, e := os.OpenFile(logfile, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) + c.Assert(e, IsNil) + + tt := protocol.Request_QUERY + db := "abc" + req := &protocol.Request{Type: &tt, + Database: &db, + MultiSeries: []*protocol.Series{&protocol.Series{Name: &db}}} + bs, e := req.Encode() + c.Assert(e, IsNil) + + hdr := &entryHeader{requestNumber: 1, + shardId: 1, length: uint32(len(bs))} + _, e = hdr.Write(f) + c.Assert(e, IsNil) + _, e = f.Write(bs) + c.Assert(e, IsNil) + f.Close() + + size, e := checkAndRepairLogFile(logfile) + c.Assert(e, IsNil) + c.Assert(size, Equals, uint64(12+len(bs))) +} + +func (_ *WalSuite) TestCheckLogFileIsErrorRequest(c *C) { + logfile := filepath.Join(c.MkDir(), "log.1") + f, e := os.OpenFile(logfile, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) + c.Assert(e, IsNil) + hdr := &entryHeader{requestNumber: 1, + shardId: 1, length: 12} + _, e = hdr.Write(f) + c.Assert(e, IsNil) + + f.Write([]byte("0123456789012345678")) + f.Close() + size, e := checkAndRepairLogFile(logfile) + c.Assert(e, IsNil) + c.Assert(size, Equals, uint64(0)) +} + +func (_ *WalSuite) TestCheckLogFileHasInvalidData(c *C) { + logfile := filepath.Join(c.MkDir(), "log.1") + f, e := os.OpenFile(logfile, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644) + c.Assert(e, IsNil) + + tt := protocol.Request_QUERY + db := "abc" + req := &protocol.Request{Type: &tt, + Database: &db, + MultiSeries: []*protocol.Series{&protocol.Series{Name: &db}}} + bs, e := req.Encode() + c.Assert(e, IsNil) + + hdr := &entryHeader{requestNumber: 1, + shardId: 1, length: uint32(len(bs))} + _, e = hdr.Write(f) + c.Assert(e, IsNil) + _, e = f.Write(bs) + c.Assert(e, IsNil) + _, e = f.Write([]byte("012345678901234567801234567890123456780123456789012345678")) + c.Assert(e, IsNil) + f.Close() + + size, e := checkAndRepairLogFile(logfile) + c.Assert(e, IsNil) + c.Assert(size, Equals, uint64(12+len(bs))) +} diff --git a/wal/wal.go b/wal/wal.go index 566ce289f9d..7728a43a62f 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -373,12 +373,7 @@ func (self *WAL) createNewLog(firstRequestNumber uint32) (*log, error) { func (self *WAL) openLog(logFileName string) (*log, *index, error) { logger.Info("Opening log file %s", logFileName) - - logFile, err := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - return nil, nil, err - } - log, err := newLog(logFile, self.config) + log, err := newLog(logFileName, self.config) if err != nil { return nil, nil, err }