diff --git a/database.go b/database.go index a91e7212e70..97cfa9a83af 100644 --- a/database.go +++ b/database.go @@ -221,6 +221,54 @@ func (m *Measurement) addSeries(s *Series) bool { return true } +// removeSeries will remove a series from the measurementIndex. Returns true if already removed +func (m *Measurement) dropSeries(seriesID uint32) bool { + if _, ok := m.seriesByID[seriesID]; !ok { + return true + } + s := m.seriesByID[seriesID] + tagset := string(marshalTags(s.Tags)) + + delete(m.series, tagset) + delete(m.seriesByID, seriesID) + + var ids []uint32 + for _, id := range m.seriesIDs { + if id != seriesID { + ids = append(ids, id) + } + } + m.seriesIDs = ids + + // remove this series id to the tag index on the measurement + // s.seriesByTagKeyValue is defined as map[string]map[string]seriesIDs + for k, v := range m.seriesByTagKeyValue { + values := v + for kk, vv := range values { + var ids []uint32 + for _, id := range vv { + if id != seriesID { + ids = append(ids, id) + } + } + // Check to see if we have any ids, if not, remove the key + if len(ids) == 0 { + delete(values, kk) + } else { + values[kk] = ids + } + } + // If we have no values, then we delete the key + if len(values) == 0 { + delete(m.seriesByTagKeyValue, k) + } else { + m.seriesByTagKeyValue[k] = values + } + } + + return true +} + // seriesByTags returns the Series that matches the given tagset. func (m *Measurement) seriesByTags(tags map[string]string) *Series { return m.series[string(marshalTags(tags))] @@ -1002,6 +1050,17 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup { return nil } +// dropSeries will delete all data with the seriesID +func (rp *RetentionPolicy) dropSeries(seriesID uint32) error { + for _, g := range rp.shardGroups { + err := g.dropSeries(seriesID) + if err != nil { + return err + } + } + return nil +} + func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) { for i, g := range rp.shardGroups { if g.ID == shardID { @@ -1075,6 +1134,32 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool { return idx.addSeries(s) } +// dropSeries removes the series from the in memory references +func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error { + for measurement, ids := range seriesByMeasurement { + for _, id := range ids { + // if the series is already gone, return + if db.series[id] == nil { + continue + } + + delete(db.series, id) + + // Remove series information from measurements + db.measurements[measurement].dropSeries(id) + + // Remove shard data + for _, rp := range db.policies { + if err := rp.dropSeries(id); err != nil { + return err + } + } + } + } + + return nil +} + // createMeasurementIfNotExists will either add a measurement object to the index or return the existing one. func (db *database) createMeasurementIfNotExists(name string) *Measurement { idx := db.measurements[name] diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 93a7e9af6d9..a5402d2b426 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1156,6 +1156,27 @@ func TestHandler_ShowContinuousQueries(t *testing.T) { } +func TestHandler_DropSeries(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } + + query := map[string]string{"db": "foo", "q": "DROP SERIES FROM cpu"} + status, _ = MustHTTP("GET", s.URL+`/query`, query, nil, "") + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } +} + func TestHandler_serveWriteSeries(t *testing.T) { srvr := OpenAuthenticatedServer(NewMessagingClient()) srvr.CreateDatabase("foo") diff --git a/influxql/ast.go b/influxql/ast.go index af07e48e77a..f994064eae4 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -985,11 +985,37 @@ func (s *ShowSeriesStatement) RequiredPrivileges() ExecutionPrivileges { // DropSeriesStatement represents a command for removing a series from the database. type DropSeriesStatement struct { - Name string + // The Id of the series being dropped (optional) + SeriesID uint32 + + // Data source that fields are extracted from (optional) + Source Source + + // An expression evaluated on data point (optional) + Condition Expr } // String returns a string representation of the drop series statement. -func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES %s", s.Name) } +func (s *DropSeriesStatement) String() string { + var buf bytes.Buffer + i, _ := buf.WriteString("DROP SERIES") + + if s.Source != nil { + _, _ = buf.WriteString(" FROM ") + _, _ = buf.WriteString(s.Source.String()) + } + if s.Condition != nil { + _, _ = buf.WriteString(" WHERE ") + _, _ = buf.WriteString(s.Condition.String()) + } + + // If we haven't written any data since the initial statement, then this was a SeriesID statement + if len(buf.String()) == i { + _, _ = buf.WriteString(fmt.Sprintf(" %d", s.SeriesID)) + } + + return buf.String() +} // RequiredPrivileges returns the privilige reqired to execute a DropSeriesStatement. func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges { diff --git a/influxql/parser.go b/influxql/parser.go index 26a9879b540..553cf950955 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -859,14 +859,31 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error) // This function assumes the "DROP SERIES" tokens have already been consumed. func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { stmt := &DropSeriesStatement{} + var err error - // Read the name of the series to drop. - lit, err := p.parseIdent() - if err != nil { + if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM { + // Parse source. + if stmt.Source, err = p.parseSource(); err != nil { + return nil, err + } + } else { + p.unscan() + } + + // Parse condition: "WHERE EXPR". + if stmt.Condition, err = p.parseCondition(); err != nil { return nil, err } - stmt.Name = lit + // If they didn't provide a FROM or a WHERE, they need to provide the SeriesID + if stmt.Condition == nil && stmt.Source == nil { + var id int + id, err = p.parseInt(0, math.MaxUint32) + if err != nil { + return nil, err + } + stmt.SeriesID = uint32(id) + } return stmt, nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 5a844d0dd53..45e197b2ba9 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -315,8 +315,33 @@ func TestParser_ParseStatement(t *testing.T) { // DROP SERIES statement { - s: `DROP SERIES myseries`, - stmt: &influxql.DropSeriesStatement{Name: "myseries"}, + s: `DROP SERIES 1`, + stmt: &influxql.DropSeriesStatement{SeriesID: 1}, + }, + { + s: `DROP SERIES FROM src`, + stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}}, + }, + { + s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, }, // SHOW CONTINUOUS QUERIES statement @@ -604,7 +629,9 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`}, {s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`}, {s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, - {s: `DROP SERIES`, err: `found EOF, expected identifier at line 1, char 13`}, + {s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`}, + {s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`}, + {s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, {s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`}, {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`}, @@ -933,6 +960,62 @@ func TestQuoteIdent(t *testing.T) { } } +// Ensure DropSeriesStatement can convert to a string +func TestDropSeriesStatement_String(t *testing.T) { + var tests = []struct { + s string + stmt influxql.Statement + }{ + { + s: `DROP SERIES 1`, + stmt: &influxql.DropSeriesStatement{SeriesID: 1}, + }, + { + s: `DROP SERIES FROM src`, + stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}}, + }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + { + s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + } + + for _, test := range tests { + s := test.stmt.String() + if s != test.s { + t.Errorf("error rendering string. expected %s, actual: %s", test.s, s) + } + } +} + func BenchmarkParserParseStatement(b *testing.B) { b.ReportAllocs() s := `SELECT field FROM "series" WHERE value > 10` diff --git a/metastore.go b/metastore.go index e956ad6150d..ff52c0f36a1 100644 --- a/metastore.go +++ b/metastore.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "time" - "unsafe" "github.com/boltdb/bolt" ) @@ -231,8 +230,7 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* // store the tag map for the series s := &Series{ID: uint32(id), Tags: tags} - idBytes := make([]byte, 4) - *(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id) + idBytes := u32tob(uint32(id)) if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil { return nil, err @@ -240,6 +238,21 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* return s, nil } +// dropSeries removes all seriesIDS for a given database/measurement +func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint32) error { + for measurement, ids := range seriesByMeasurement { + b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(measurement)) + if b != nil { + for _, id := range ids { + if err := b.Delete(u32tob(id)); err != nil { + return err + } + } + } + } + return nil +} + // loops through all the measurements and series in a database func (tx *metatx) indexDatabase(db *database) { // get the bucket that holds series data for the database diff --git a/server.go b/server.go index 4205425c5f7..d8d56291122 100644 --- a/server.go +++ b/server.go @@ -68,17 +68,20 @@ const ( createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40) deleteShardGroupMessageType = messaging.MessageType(0x41) + // Series messages + dropSeriesMessageType = messaging.MessageType(0x50) + // Measurement messages - createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x50) + createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60) // Continuous Query messages - createContinuousQueryMessageType = messaging.MessageType(0x60) + createContinuousQueryMessageType = messaging.MessageType(0x70) // Write series data messages (per-topic) - writeRawSeriesMessageType = messaging.MessageType(0x70) + writeRawSeriesMessageType = messaging.MessageType(0x80) // Privilege messages - setPrivilegeMessageType = messaging.MessageType(0x80) + setPrivilegeMessageType = messaging.MessageType(0x90) ) // Server represents a collection of metadata and raw metric data. @@ -1504,6 +1507,46 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, return nil } +func (s *Server) applyDropSeries(m *messaging.Message) error { + var c dropSeriesCommand + mustUnmarshalJSON(m.Data, &c) + + database := s.databases[c.Database] + if database == nil { + return ErrDatabaseNotFound + } + + // Remove from metastore. + err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { + if err := tx.dropSeries(c.Database, c.SeriesByMeasurement); err != nil { + return err + } + + // Delete series from the database. + if err := database.dropSeries(c.SeriesByMeasurement); err != nil { + return fmt.Errorf("failed to remove series from index") + } + return nil + }) + if err != nil { + return err + } + + return nil +} + +// DropSeries deletes from an existing series. +func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]uint32) error { + c := dropSeriesCommand{Database: database, SeriesByMeasurement: seriesByMeasurement} + _, err := s.broadcast(dropSeriesMessageType, c) + return err +} + +type dropSeriesCommand struct { + Database string `json:"datbase"` + SeriesByMeasurement map[string][]uint32 `json:"seriesIds"` +} + // Point defines the values that will be written to the database type Point struct { Name string @@ -1872,7 +1915,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowUsersStatement: res = s.executeShowUsersStatement(stmt, user) case *influxql.DropSeriesStatement: - continue + res = s.executeDropSeriesStatement(stmt, database, user) case *influxql.ShowSeriesStatement: res = s.executeShowSeriesStatement(stmt, database, user) case *influxql.ShowMeasurementsStatement: @@ -2016,6 +2059,60 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U return &Result{Err: s.DeleteUser(q.Name)} } +func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { + s.mu.RLock() + + seriesByMeasurement := make(map[string][]uint32) + // Handle the simple `DROP SERIES ` case. + if stmt.Source == nil && stmt.Condition == nil { + for _, db := range s.databases { + for _, m := range db.measurements { + if m.seriesByID[stmt.SeriesID] != nil { + seriesByMeasurement[m.Name] = []uint32{stmt.SeriesID} + } + } + } + + s.mu.RUnlock() + return &Result{Err: s.DropSeries(database, seriesByMeasurement)} + } + + // Handle the more complicated `DROP SERIES` with sources and/or conditions... + + // Find the database. + db := s.databases[database] + if db == nil { + s.mu.RUnlock() + return &Result{Err: ErrDatabaseNotFound} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourceOrDB(stmt.Source, db) + if err != nil { + s.mu.RUnlock() + return &Result{Err: err} + } + + for _, m := range measurements { + var ids seriesIDs + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + filters := map[uint32]influxql.Expr{} + ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) + + // TODO: check return of walkWhereForSeriesIds for fields + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + seriesByMeasurement[m.Name] = ids + } + s.mu.RUnlock() + + return &Result{Err: s.DropSeries(database, seriesByMeasurement)} +} + func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result { s.mu.RLock() defer s.mu.RUnlock() @@ -2690,6 +2787,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { err = s.applySetPrivilege(m) case createContinuousQueryMessageType: err = s.applyCreateContinuousQueryCommand(m) + case dropSeriesMessageType: + err = s.applyDropSeries(m) } // Sync high water mark and errors. diff --git a/server_test.go b/server_test.go index f9fd351c524..69f7baa4c65 100644 --- a/server_test.go +++ b/server_test.go @@ -830,6 +830,133 @@ func TestServer_WriteSeries(t *testing.T) { } } +// Ensure the server can drop a series. +func TestServer_DropSeries(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.SetDefaultRetentionPolicy("foo", "raw") + s.CreateUser("susy", "pass", false) + + // Write series with one point to the database. + tags := map[string]string{"host": "serverA", "region": "uswest"} + index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + // Ensure series exiss + results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + // Drop series + results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu`), "foo", nil) + if results.Error() != nil { + t.Fatalf("unexpected error: %s", results.Error()) + } + + results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":[]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + +} + +// Ensure Drop Series can: +// write to measurement cpu with tags region=uswest host=serverA +// write to measurement cpu with tags region=uswest host=serverB +// drop one of those series +// ensure that the dropped series is gone +// ensure that we can still query: select value from cpu where region=uswest +func TestServer_DropSeriesTagsPreserved(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.SetDefaultRetentionPolicy("foo", "raw") + s.CreateUser("susy", "pass", false) + + // Write series with one point to the database. + tags := map[string]string{"host": "serverA", "region": "uswest"} + index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + tags = map[string]string{"host": "serverB", "region": "uswest"} + index, err = s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:01Z"), Values: map[string]interface{}{"value": float64(33.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"],["serverB","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu where host='serverA'`), "foo", nil) + if results.Error() != nil { + t.Fatalf("unexpected error: %s", results.Error()) + } + + results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverB","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverA'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 0 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverB'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where region='uswest'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } +} + // Ensure the server can execute a query and return the data correctly. func TestServer_ExecuteQuery(t *testing.T) { s := OpenServer(NewMessagingClient()) diff --git a/shard.go b/shard.go index abf376f5251..857e7a02998 100644 --- a/shard.go +++ b/shard.go @@ -45,6 +45,17 @@ func newShardGroup() *ShardGroup { return &ShardGroup{} } // Duration returns the duration between the shard group's start and end time. func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) } +// dropSeries will delete all data with the seriesID +func (g *ShardGroup) dropSeries(seriesID uint32) error { + for _, s := range g.Shards { + err := s.dropSeries(seriesID) + if err != nil { + return err + } + } + return nil +} + // newShard returns a new initialized Shard instance. func newShard() *Shard { return &Shard{} } @@ -145,8 +156,13 @@ func (s *Shard) writeSeries(batch []byte) error { }) } -func (s *Shard) deleteSeries(name string) error { - panic("not yet implemented") // TODO +func (s *Shard) dropSeries(seriesID uint32) error { + if s.store == nil { + return nil + } + return s.store.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket(u32tob(seriesID)) + }) } // Shards represents a list of shards.