From fd9c19ad9c4a0c1e20522fce98de8cd79de6246c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 17 Feb 2015 16:29:25 -0700 Subject: [PATCH 01/32] wire up drop series parsing --- influxql/ast.go | 27 +++++++++++++++++-- influxql/parser.go | 27 ++++++++++++++----- influxql/parser_test.go | 57 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index af07e48e77a..3d4a02769d9 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -985,11 +985,34 @@ func (s *ShowSeriesStatement) RequiredPrivileges() ExecutionPrivileges { // DropSeriesStatement represents a command for removing a series from the database. type DropSeriesStatement struct { - Name string + // The Id of the series being dropped (optional) + SeriesID uint32 + + // Data source that fields are extracted from (optional) + Source Source + + // An expression evaluated on data point (optional) + Condition Expr } // String returns a string representation of the drop series statement. -func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES %s", s.Name) } +func (s *DropSeriesStatement) String() string { + var buf bytes.Buffer + _, _ = buf.WriteString("DROP SERIES ") + + if s.Source != nil { + _, _ = buf.WriteString("FROM ") + _, _ = buf.WriteString(s.Source.String()) + if s.Condition != nil { + _, _ = buf.WriteString(" WHERE ") + _, _ = buf.WriteString(s.Condition.String()) + } + } else { + _, _ = buf.WriteString(fmt.Sprintf("%d", s.SeriesID)) + } + + return buf.String() +} // RequiredPrivileges returns the privilige reqired to execute a DropSeriesStatement. func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges { diff --git a/influxql/parser.go b/influxql/parser.go index 26a9879b540..452a9227588 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -859,14 +859,29 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error) // This function assumes the "DROP SERIES" tokens have already been consumed. func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { stmt := &DropSeriesStatement{} + var err error - // Read the name of the series to drop. - lit, err := p.parseIdent() - if err != nil { - return nil, err - } - stmt.Name = lit + tok, _, _ := p.scanIgnoreWhitespace() + if tok == FROM { + // Parse source. + if stmt.Source, err = p.parseSource(); err != nil { + return nil, err + } + // Parse condition: "WHERE EXPR". + if stmt.Condition, err = p.parseCondition(); err != nil { + return nil, err + } + + } else { + p.unscan() + var id int + id, err = p.parseInt(0, math.MaxUint32) + if err != nil { + return nil, err + } + stmt.SeriesID = uint32(id) + } return stmt, nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 5a844d0dd53..783e58a3423 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -315,8 +315,23 @@ func TestParser_ParseStatement(t *testing.T) { // DROP SERIES statement { - s: `DROP SERIES myseries`, - stmt: &influxql.DropSeriesStatement{Name: "myseries"}, + s: `DROP SERIES 1`, + stmt: &influxql.DropSeriesStatement{SeriesID: 1}, + }, + { + s: `DROP SERIES FROM src`, + stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}}, + }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, }, // SHOW CONTINUOUS QUERIES statement @@ -604,7 +619,8 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`}, {s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`}, {s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, - {s: `DROP SERIES`, err: `found EOF, expected identifier at line 1, char 13`}, + {s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`}, + {s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`}, {s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`}, {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`}, @@ -933,6 +949,41 @@ func TestQuoteIdent(t *testing.T) { } } +// Ensure DropSeriesStatement can convert to a string +func TestDropSeriesStatement_String(t *testing.T) { + var tests = []struct { + s string + stmt influxql.Statement + }{ + { + s: `DROP SERIES 1`, + stmt: &influxql.DropSeriesStatement{SeriesID: 1}, + }, + { + s: `DROP SERIES FROM src`, + stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}}, + }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + } + + for _, test := range tests { + s := test.stmt.String() + if s != test.s { + t.Errorf("error rendering string. expected %s, actual: %s", test.s, s) + } + } +} + func BenchmarkParserParseStatement(b *testing.B) { b.ReportAllocs() s := `SELECT field FROM "series" WHERE value > 10` From 918c5c8ce97417ed5c63534e949b0c889b50bca8 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 17 Feb 2015 16:52:52 -0700 Subject: [PATCH 02/32] one more err test condition for drop series --- influxql/parser_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 783e58a3423..206e47e9bf3 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -621,6 +621,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, {s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`}, {s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`}, + {s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, {s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`}, {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`}, From 27d7f457298f0a4bdc46fa199fc3ac433f7aac26 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 18 Feb 2015 15:20:07 -0700 Subject: [PATCH 03/32] support drop series without from but with where --- influxql/ast.go | 19 +++++++++++-------- influxql/parser.go | 15 +++++++++------ influxql/parser_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 3d4a02769d9..f994064eae4 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -998,17 +998,20 @@ type DropSeriesStatement struct { // String returns a string representation of the drop series statement. func (s *DropSeriesStatement) String() string { var buf bytes.Buffer - _, _ = buf.WriteString("DROP SERIES ") + i, _ := buf.WriteString("DROP SERIES") if s.Source != nil { - _, _ = buf.WriteString("FROM ") + _, _ = buf.WriteString(" FROM ") _, _ = buf.WriteString(s.Source.String()) - if s.Condition != nil { - _, _ = buf.WriteString(" WHERE ") - _, _ = buf.WriteString(s.Condition.String()) - } - } else { - _, _ = buf.WriteString(fmt.Sprintf("%d", s.SeriesID)) + } + if s.Condition != nil { + _, _ = buf.WriteString(" WHERE ") + _, _ = buf.WriteString(s.Condition.String()) + } + + // If we haven't written any data since the initial statement, then this was a SeriesID statement + if len(buf.String()) == i { + _, _ = buf.WriteString(fmt.Sprintf(" %d", s.SeriesID)) } return buf.String() diff --git a/influxql/parser.go b/influxql/parser.go index 452a9227588..982b447029d 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -867,14 +867,17 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { if stmt.Source, err = p.parseSource(); err != nil { return nil, err } - - // Parse condition: "WHERE EXPR". - if stmt.Condition, err = p.parseCondition(); err != nil { - return nil, err - } - } else { p.unscan() + } + + // Parse condition: "WHERE EXPR". + if stmt.Condition, err = p.parseCondition(); err != nil { + return nil, err + } + + // If they didn't provide a FROM or a WHERE, they need to provide the SeriesID + if stmt.Condition == nil && stmt.Source == nil { var id int id, err = p.parseInt(0, math.MaxUint32) if err != nil { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 206e47e9bf3..45e197b2ba9 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -322,6 +322,16 @@ func TestParser_ParseStatement(t *testing.T) { s: `DROP SERIES FROM src`, stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}}, }, + { + s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, { s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, stmt: &influxql.DropSeriesStatement{ @@ -975,6 +985,27 @@ func TestDropSeriesStatement_String(t *testing.T) { }, }, }, + { + s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Source: &influxql.Measurement{Name: "src"}, + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, + { + s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`, + stmt: &influxql.DropSeriesStatement{ + Condition: &influxql.BinaryExpr{ + Op: influxql.EQ, + LHS: &influxql.VarRef{Val: "host"}, + RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"}, + }, + }, + }, } for _, test := range tests { From 6370a4ef2b9fd69d44faeb0a6e7203ed93521aab Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 18 Feb 2015 15:20:19 -0700 Subject: [PATCH 04/32] wip in progress for actually dropping the series data --- server.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 4205425c5f7..2aaedd10417 100644 --- a/server.go +++ b/server.go @@ -68,6 +68,10 @@ const ( createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40) deleteShardGroupMessageType = messaging.MessageType(0x41) + // Series messages + createSeriesIfNotExistsMessageType = messaging.MessageType(0x50) + deleteSeriesMessageType = messaging.MessageType(0x51) + // Measurement messages createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x50) @@ -1504,6 +1508,37 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, return nil } +// DeleteSeries deletes from an existing series. +func (s *Server) DeleteSeries(stmt *influxql.DropSeriesStatement, database string) error { + c := deleteSeriesCommand{} + + if stmt.Source != nil { + measurement := stmt.Source.(*influxql.Measurement) + + m := s.databases[database].measurements[measurement.Name] + if m == nil { + return fmt.Errorf("measurement not found: %s", measurement.Name) + } + + seriesIdsToExpr := make(map[uint32]influxql.Expr) + ids, _, _ := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr) + // TODO check if expr is empty, if not, we got things other than tags + + c.SeriesIDs = ids + + } else { + + c.SeriesIDs = append(c.SeriesIDs, stmt.SeriesID) + } + + _, err := s.broadcast(deleteSeriesMessageType, c) + return err +} + +type deleteSeriesCommand struct { + SeriesIDs []uint32 `json:"seriesIds"` +} + // Point defines the values that will be written to the database type Point struct { Name string @@ -1872,7 +1907,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowUsersStatement: res = s.executeShowUsersStatement(stmt, user) case *influxql.DropSeriesStatement: - continue + res = s.executeDeleteSeriesStatement(stmt, database, user) case *influxql.ShowSeriesStatement: res = s.executeShowSeriesStatement(stmt, database, user) case *influxql.ShowMeasurementsStatement: @@ -2016,6 +2051,16 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U return &Result{Err: s.DeleteUser(q.Name)} } +func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) *Result { + return &Result{Err: s.DeleteSeries(stmt, database)} +} + +func (s *Server) executeDeleteSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { + // TODO + + return &Result{} +} + func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result { s.mu.RLock() defer s.mu.RUnlock() From 7cfc2a5601bf5fd747b6c68afd3388fa6995df9a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 19 Feb 2015 17:25:18 -0700 Subject: [PATCH 05/32] wip --- metastore.go | 15 ++++++++ server.go | 103 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/metastore.go b/metastore.go index e956ad6150d..37e6d1d08f1 100644 --- a/metastore.go +++ b/metastore.go @@ -9,6 +9,11 @@ import ( "github.com/boltdb/bolt" ) +var ( + databasesBucket = []byte("Databases") + seriesBucket = []byte("Series") +) + // metastore represents the low-level data store for metadata. type metastore struct { db *bolt.DB @@ -240,6 +245,16 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* return s, nil } +func (tx *metatx) deleteSeries(database, name string, seriesID uint32) error { + m := tx.Bucket(databasesBucket).Bucket([]byte(database)).Bucket(seriesBucket).Bucket(name) + + err := db.Bucket([]byte("Series")).DeleteBucket([]byte(id)) + if err != nil { + return err + } + return nil +} + // loops through all the measurements and series in a database func (tx *metatx) indexDatabase(db *database) { // get the bucket that holds series data for the database diff --git a/server.go b/server.go index 2aaedd10417..d9183689163 100644 --- a/server.go +++ b/server.go @@ -1508,34 +1508,44 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, return nil } -// DeleteSeries deletes from an existing series. -func (s *Server) DeleteSeries(stmt *influxql.DropSeriesStatement, database string) error { - c := deleteSeriesCommand{} - - if stmt.Source != nil { - measurement := stmt.Source.(*influxql.Measurement) - - m := s.databases[database].measurements[measurement.Name] - if m == nil { - return fmt.Errorf("measurement not found: %s", measurement.Name) - } - - seriesIdsToExpr := make(map[uint32]influxql.Expr) - ids, _, _ := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr) - // TODO check if expr is empty, if not, we got things other than tags +func (s *Server) applyDeleteSeries(m *messaging.Message) error { + var c deleteSeriesCommand + mustUnmarshalJSON(m.Data, &c) - c.SeriesIDs = ids + s.mu.Lock() + defer s.mu.Unlock() + if s.databases[c.Database] == nil { + return ErrDatabaseNotFound + } - } else { + // Remove from metastore. + err := s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteSeries(c.Database, c.SeriesIDs) }) + if err != nil { + return err + } - c.SeriesIDs = append(c.SeriesIDs, stmt.SeriesID) + // Delete the database entry. + for _, id := range c.SeriesIDs { + delete(s.databases[c.Database].series, id) } + return nil +} + +type createSeriesIfNotExistsCommand struct { + Database string `json:"database"` + Name string `json:"name"` + Tags map[string]string `json:"tags"` +} +// DeleteSeries deletes from an existing series. +func (s *Server) DeleteSeries(database string, SeriesIDs ...uint32) error { + c := deleteSeriesCommand{Database: database, SeriesIDs: SeriesIDs} _, err := s.broadcast(deleteSeriesMessageType, c) return err } type deleteSeriesCommand struct { + Database string `json:"datbase"` SeriesIDs []uint32 `json:"seriesIds"` } @@ -1907,7 +1917,7 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re case *influxql.ShowUsersStatement: res = s.executeShowUsersStatement(stmt, user) case *influxql.DropSeriesStatement: - res = s.executeDeleteSeriesStatement(stmt, database, user) + res = s.executeDropSeriesStatement(stmt, database, user) case *influxql.ShowSeriesStatement: res = s.executeShowSeriesStatement(stmt, database, user) case *influxql.ShowMeasurementsStatement: @@ -2051,13 +2061,56 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U return &Result{Err: s.DeleteUser(q.Name)} } -func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) *Result { - return &Result{Err: s.DeleteSeries(stmt, database)} -} +func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { + s.mu.RLock() + defer s.mu.RUnlock() + + // Handle the simple `DROP SERIES ` case. + if stmt.Source == nil && stmt.Condition == nil { + return &Result{Err: s.DeleteSeries(database, stmt.SeriesID)} + } -func (s *Server) executeDeleteSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { - // TODO + // Handle the more complicated `DROP SERIES` with sources and/or conditions... + // Find the database. + db := s.databases[database] + if db == nil { + return &Result{Err: ErrDatabaseNotFound} + } + + // Get the list of measurements we're interested in. + measurements, err := measurementsFromSourceOrDB(stmt.Source, db) + if err != nil { + return &Result{Err: err} + } + + for _, m := range measurements { + var ids seriesIDs + + if stmt.Condition != nil { + // Get series IDs that match the WHERE clause. + filters := map[uint32]influxql.Expr{} + ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) + + // If no series matched, then go to the next measurement. + if len(ids) == 0 { + continue + } + + // TODO: check return of walkWhereForSeriesIds for fields + } else { + // No WHERE clause so get all series IDs for this measurement. + ids = m.seriesIDs + } + + // Delete series by ID. + for _, id := range ids { + err := s.DeleteSeries(database, id) + if err != nil { + return &Result{Err: err} + } + } + } return &Result{} } @@ -2735,6 +2788,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { err = s.applySetPrivilege(m) case createContinuousQueryMessageType: err = s.applyCreateContinuousQueryCommand(m) + case deleteSeriesMessageType: + err = s.applyDeleteSeries(m) } // Sync high water mark and errors. From 6d5be3da25889f57954c623bde87e44871f6e16a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 10:58:21 -0700 Subject: [PATCH 06/32] wip. compiles now. tests next --- database.go | 11 +++++++++++ metastore.go | 17 ++++++++--------- server.go | 19 +++++++++++++++---- shard.go | 17 +++++++++++++++-- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/database.go b/database.go index a91e7212e70..f9799dabc77 100644 --- a/database.go +++ b/database.go @@ -1002,6 +1002,17 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup { return nil } +// deleteSeries will delete all data with the seriesID +func (rp *RetentionPolicy) deleteSeries(seriesID uint32) error { + for _, g := range rp.shardGroups { + err := g.deleteSeries(seriesID) + if err != nil { + return err + } + } + return nil +} + func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) { for i, g := range rp.shardGroups { if g.ID == shardID { diff --git a/metastore.go b/metastore.go index 37e6d1d08f1..b0be265e04e 100644 --- a/metastore.go +++ b/metastore.go @@ -9,11 +9,6 @@ import ( "github.com/boltdb/bolt" ) -var ( - databasesBucket = []byte("Databases") - seriesBucket = []byte("Series") -) - // metastore represents the low-level data store for metadata. type metastore struct { db *bolt.DB @@ -246,12 +241,16 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* } func (tx *metatx) deleteSeries(database, name string, seriesID uint32) error { - m := tx.Bucket(databasesBucket).Bucket([]byte(database)).Bucket(seriesBucket).Bucket(name) + measurmentBucket := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) - err := db.Bucket([]byte("Series")).DeleteBucket([]byte(id)) - if err != nil { - return err + c := measurmentBucket.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + id := btou32(k) + if id == seriesID { + c.Delete() + } } + return nil } diff --git a/server.go b/server.go index d9183689163..a4697414d87 100644 --- a/server.go +++ b/server.go @@ -1514,14 +1514,25 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { s.mu.Lock() defer s.mu.Unlock() - if s.databases[c.Database] == nil { + database := s.databases[c.Database] + if database == nil { return ErrDatabaseNotFound } // Remove from metastore. - err := s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteSeries(c.Database, c.SeriesIDs) }) - if err != nil { - return err + for _, seriesID := range c.SeriesIDs { + series := database.series[seriesID] + if series == nil { + return fmt.Errorf("series not found for id %d", seriesID) + } + m := series.measurement + if m == nil { + return fmt.Errorf("measurement not found for series %d", seriesID) + } + err := s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteSeries(c.Database, m.Name, seriesID) }) + if err != nil { + return err + } } // Delete the database entry. diff --git a/shard.go b/shard.go index abf376f5251..e361c1dc197 100644 --- a/shard.go +++ b/shard.go @@ -45,6 +45,17 @@ func newShardGroup() *ShardGroup { return &ShardGroup{} } // Duration returns the duration between the shard group's start and end time. func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) } +// deleteSeries will delete all data with the seriesID +func (g *ShardGroup) deleteSeries(seriesID uint32) error { + for _, s := range g.Shards { + err := s.deleteSeries(seriesID) + if err != nil { + return err + } + } + return nil +} + // newShard returns a new initialized Shard instance. func newShard() *Shard { return &Shard{} } @@ -145,8 +156,10 @@ func (s *Shard) writeSeries(batch []byte) error { }) } -func (s *Shard) deleteSeries(name string) error { - panic("not yet implemented") // TODO +func (s *Shard) deleteSeries(seriesID uint32) error { + return s.store.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket(u32tob(seriesID)) + }) } // Shards represents a list of shards. From 8306057179bffd4f255350ad0d0fa0ebb6d756ea Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 13:30:40 -0700 Subject: [PATCH 07/32] actually drop shard data --- server.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server.go b/server.go index a4697414d87..73c912475d2 100644 --- a/server.go +++ b/server.go @@ -1535,6 +1535,15 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { } } + for _, rp := range s.databases[c.Database].policies { + for _, id := range c.SeriesIDs { + err := rp.deleteSeries(id) + if err != nil { + return err + } + } + } + // Delete the database entry. for _, id := range c.SeriesIDs { delete(s.databases[c.Database].series, id) From c2daa18aae3daed514ecd809f56a43d651e9dab2 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 13:30:52 -0700 Subject: [PATCH 08/32] first test for dropping series --- server_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/server_test.go b/server_test.go index f9fd351c524..42108137d2d 100644 --- a/server_test.go +++ b/server_test.go @@ -830,6 +830,32 @@ func TestServer_WriteSeries(t *testing.T) { } } +// Ensure the server can drop a series. +func TestServer_DropSeries(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 1 * time.Hour}) + s.CreateUser("susy", "pass", false) + + // Write series with one point to the database. + tags := map[string]string{"host": "servera.influx.com", "region": "uswest"} + index, err := s.WriteSeries("foo", "mypolicy", []influxdb.Point{{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + // Drop the first series + if err := s.DeleteSeries("foo", 1); err != nil { + t.Fatal(err) + } else if s.SeriesExists("foo", 1) { + t.Fatalf("series not actually dropped") + } +} + // Ensure the server can execute a query and return the data correctly. func TestServer_ExecuteQuery(t *testing.T) { s := OpenServer(NewMessagingClient()) From 4948b6278412c5aadf8294eb0646b3f216f0e55f Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 15:06:22 -0700 Subject: [PATCH 09/32] first working version of drop series. more testing to come --- database.go | 39 +++++++++++++++++++++++++++++++++++++++ httpd/handler_test.go | 21 +++++++++++++++++++++ metastore.go | 10 ++++++---- server.go | 19 +++++++++++++++++-- 4 files changed, 83 insertions(+), 6 deletions(-) diff --git a/database.go b/database.go index f9799dabc77..4fd40e116e5 100644 --- a/database.go +++ b/database.go @@ -221,6 +221,34 @@ func (m *Measurement) addSeries(s *Series) bool { return true } +// removeSeries will remove a series from the measurementIndex. Returns true if already removed +func (m *Measurement) removeSeries(seriesID uint32) bool { + if _, ok := m.seriesByID[seriesID]; !ok { + return true + } + s := m.seriesByID[seriesID] + tagset := string(marshalTags(s.Tags)) + + delete(m.series, tagset) + delete(m.seriesByID, seriesID) + + var ids []uint32 + for _, id := range m.seriesIDs { + if id != seriesID { + ids = append(ids, id) + } + } + m.seriesIDs = ids + sort.Sort(m.seriesIDs) + + // add this series id to the tag index on the measurement + for k, _ := range s.Tags { + delete(m.seriesByTagKeyValue, k) + } + + return true +} + // seriesByTags returns the Series that matches the given tagset. func (m *Measurement) seriesByTags(tags map[string]string) *Series { return m.series[string(marshalTags(tags))] @@ -1086,6 +1114,17 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool { return idx.addSeries(s) } +// removeSeriesFromIndex removes the series from the index +func (db *database) removeSeriesFromIndex(seriesID uint32) bool { + // if the series is already gone, return + if db.series[seriesID] == nil { + return true + } + + delete(db.series, seriesID) + return true +} + // createMeasurementIfNotExists will either add a measurement object to the index or return the existing one. func (db *database) createMeasurementIfNotExists(name string) *Measurement { idx := db.measurements[name] diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 93a7e9af6d9..a5402d2b426 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1156,6 +1156,27 @@ func TestHandler_ShowContinuousQueries(t *testing.T) { } +func TestHandler_DropSeries(t *testing.T) { + srvr := OpenAuthlessServer(NewMessagingClient()) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } + + query := map[string]string{"db": "foo", "q": "DROP SERIES FROM cpu"} + status, _ = MustHTTP("GET", s.URL+`/query`, query, nil, "") + + if status != http.StatusOK { + t.Fatalf("unexpected status: %d", status) + } +} + func TestHandler_serveWriteSeries(t *testing.T) { srvr := OpenAuthenticatedServer(NewMessagingClient()) srvr.CreateDatabase("foo") diff --git a/metastore.go b/metastore.go index b0be265e04e..8b16906b8f3 100644 --- a/metastore.go +++ b/metastore.go @@ -4,7 +4,6 @@ import ( "encoding/binary" "fmt" "time" - "unsafe" "github.com/boltdb/bolt" ) @@ -231,8 +230,7 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* // store the tag map for the series s := &Series{ID: uint32(id), Tags: tags} - idBytes := make([]byte, 4) - *(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id) + idBytes := u32tob(uint32(id)) if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil { return nil, err @@ -244,10 +242,14 @@ func (tx *metatx) deleteSeries(database, name string, seriesID uint32) error { measurmentBucket := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) c := measurmentBucket.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { id := btou32(k) if id == seriesID { - c.Delete() + err := c.Delete() + if err != nil { + return err + } } } diff --git a/server.go b/server.go index 73c912475d2..77f7dfa8d89 100644 --- a/server.go +++ b/server.go @@ -1535,6 +1535,7 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { } } + // Remove shard data for _, rp := range s.databases[c.Database].policies { for _, id := range c.SeriesIDs { err := rp.deleteSeries(id) @@ -1546,7 +1547,18 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { // Delete the database entry. for _, id := range c.SeriesIDs { - delete(s.databases[c.Database].series, id) + if !database.removeSeriesFromIndex(id) { + return fmt.Errorf("failed to remove series id %d from index", id) + } + } + + // Delete measurment references to the series + for _, m := range database.measurements { + for _, id := range c.SeriesIDs { + if !m.removeSeries(id) { + return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) + } + } } return nil } @@ -2083,10 +2095,10 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { s.mu.RLock() - defer s.mu.RUnlock() // Handle the simple `DROP SERIES ` case. if stmt.Source == nil && stmt.Condition == nil { + s.mu.RUnlock() return &Result{Err: s.DeleteSeries(database, stmt.SeriesID)} } @@ -2095,12 +2107,14 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, // Find the database. db := s.databases[database] if db == nil { + s.mu.RUnlock() return &Result{Err: ErrDatabaseNotFound} } // Get the list of measurements we're interested in. measurements, err := measurementsFromSourceOrDB(stmt.Source, db) if err != nil { + s.mu.RUnlock() return &Result{Err: err} } @@ -2123,6 +2137,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, ids = m.seriesIDs } + s.mu.RUnlock() // Delete series by ID. for _, id := range ids { err := s.DeleteSeries(database, id) From 495cedb53bf9d2ccd45554999bbf292890e535c8 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 15:20:25 -0700 Subject: [PATCH 10/32] move where we unlock --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 77f7dfa8d89..84f5d294b3c 100644 --- a/server.go +++ b/server.go @@ -2118,6 +2118,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, return &Result{Err: err} } + s.mu.RUnlock() for _, m := range measurements { var ids seriesIDs @@ -2137,7 +2138,6 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, ids = m.seriesIDs } - s.mu.RUnlock() // Delete series by ID. for _, id := range ids { err := s.DeleteSeries(database, id) From 0d6b7616de0fc901cda82ef8de94ffdc4d4224e1 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 18:24:27 -0700 Subject: [PATCH 11/32] accidentally lost this in my rebase --- server.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server.go b/server.go index 84f5d294b3c..12657cd939a 100644 --- a/server.go +++ b/server.go @@ -1486,6 +1486,17 @@ func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement return } +// SeriesExists returns true if a series exists. +func (s *Server) SeriesExists(name string, id uint32) bool { + s.mu.RLock() + defer s.mu.RUnlock() + d := s.databases[name] + if d == nil { + return false + } + return d.series[id] != nil +} + // addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already // present. It will return an error if the field is present in the command, but is of a different type. func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error { From 697e9721dd5bb61807773b583b132f2744ea1c60 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 18:30:01 -0700 Subject: [PATCH 12/32] fixing rebase conflicts --- server.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/server.go b/server.go index 12657cd939a..93d2a7b4891 100644 --- a/server.go +++ b/server.go @@ -69,20 +69,19 @@ const ( deleteShardGroupMessageType = messaging.MessageType(0x41) // Series messages - createSeriesIfNotExistsMessageType = messaging.MessageType(0x50) - deleteSeriesMessageType = messaging.MessageType(0x51) + deleteSeriesMessageType = messaging.MessageType(0x50) // Measurement messages - createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x50) + createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60) // Continuous Query messages - createContinuousQueryMessageType = messaging.MessageType(0x60) + createContinuousQueryMessageType = messaging.MessageType(0760) // Write series data messages (per-topic) - writeRawSeriesMessageType = messaging.MessageType(0x70) + writeRawSeriesMessageType = messaging.MessageType(0x80) // Privilege messages - setPrivilegeMessageType = messaging.MessageType(0x80) + setPrivilegeMessageType = messaging.MessageType(0x90) ) // Server represents a collection of metadata and raw metric data. @@ -1574,12 +1573,6 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { return nil } -type createSeriesIfNotExistsCommand struct { - Database string `json:"database"` - Name string `json:"name"` - Tags map[string]string `json:"tags"` -} - // DeleteSeries deletes from an existing series. func (s *Server) DeleteSeries(database string, SeriesIDs ...uint32) error { c := deleteSeriesCommand{Database: database, SeriesIDs: SeriesIDs} From 5ab3731159fe0b200ed88aa2f6916a5c9697dc4a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Fri, 20 Feb 2015 18:36:15 -0700 Subject: [PATCH 13/32] delete -> drop for series naming --- database.go | 6 +++--- metastore.go | 2 +- server.go | 32 ++++++++++++++++---------------- server_test.go | 2 +- shard.go | 8 ++++---- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/database.go b/database.go index 4fd40e116e5..2b08a991707 100644 --- a/database.go +++ b/database.go @@ -1030,10 +1030,10 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup { return nil } -// deleteSeries will delete all data with the seriesID -func (rp *RetentionPolicy) deleteSeries(seriesID uint32) error { +// dropSeries will delete all data with the seriesID +func (rp *RetentionPolicy) dropSeries(seriesID uint32) error { for _, g := range rp.shardGroups { - err := g.deleteSeries(seriesID) + err := g.dropSeries(seriesID) if err != nil { return err } diff --git a/metastore.go b/metastore.go index 8b16906b8f3..d4be41f5b63 100644 --- a/metastore.go +++ b/metastore.go @@ -238,7 +238,7 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* return s, nil } -func (tx *metatx) deleteSeries(database, name string, seriesID uint32) error { +func (tx *metatx) dropSeries(database, name string, seriesID uint32) error { measurmentBucket := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) c := measurmentBucket.Cursor() diff --git a/server.go b/server.go index 93d2a7b4891..893ebf25a41 100644 --- a/server.go +++ b/server.go @@ -69,7 +69,7 @@ const ( deleteShardGroupMessageType = messaging.MessageType(0x41) // Series messages - deleteSeriesMessageType = messaging.MessageType(0x50) + dropSeriesMessageType = messaging.MessageType(0x50) // Measurement messages createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60) @@ -1518,8 +1518,8 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, return nil } -func (s *Server) applyDeleteSeries(m *messaging.Message) error { - var c deleteSeriesCommand +func (s *Server) applyDropSeries(m *messaging.Message) error { + var c dropSeriesCommand mustUnmarshalJSON(m.Data, &c) s.mu.Lock() @@ -1535,11 +1535,11 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { if series == nil { return fmt.Errorf("series not found for id %d", seriesID) } - m := series.measurement - if m == nil { + measurement := series.measurement + if measurement == nil { return fmt.Errorf("measurement not found for series %d", seriesID) } - err := s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteSeries(c.Database, m.Name, seriesID) }) + err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropSeries(c.Database, measurement.Name, seriesID) }) if err != nil { return err } @@ -1548,7 +1548,7 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { // Remove shard data for _, rp := range s.databases[c.Database].policies { for _, id := range c.SeriesIDs { - err := rp.deleteSeries(id) + err := rp.dropSeries(id) if err != nil { return err } @@ -1573,14 +1573,14 @@ func (s *Server) applyDeleteSeries(m *messaging.Message) error { return nil } -// DeleteSeries deletes from an existing series. -func (s *Server) DeleteSeries(database string, SeriesIDs ...uint32) error { - c := deleteSeriesCommand{Database: database, SeriesIDs: SeriesIDs} - _, err := s.broadcast(deleteSeriesMessageType, c) +// DropSeries deletes from an existing series. +func (s *Server) DropSeries(database string, SeriesIDs ...uint32) error { + c := dropSeriesCommand{Database: database, SeriesIDs: SeriesIDs} + _, err := s.broadcast(dropSeriesMessageType, c) return err } -type deleteSeriesCommand struct { +type dropSeriesCommand struct { Database string `json:"datbase"` SeriesIDs []uint32 `json:"seriesIds"` } @@ -2103,7 +2103,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, // Handle the simple `DROP SERIES ` case. if stmt.Source == nil && stmt.Condition == nil { s.mu.RUnlock() - return &Result{Err: s.DeleteSeries(database, stmt.SeriesID)} + return &Result{Err: s.DropSeries(database, stmt.SeriesID)} } // Handle the more complicated `DROP SERIES` with sources and/or conditions... @@ -2144,7 +2144,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, // Delete series by ID. for _, id := range ids { - err := s.DeleteSeries(database, id) + err := s.DropSeries(database, id) if err != nil { return &Result{Err: err} } @@ -2827,8 +2827,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { err = s.applySetPrivilege(m) case createContinuousQueryMessageType: err = s.applyCreateContinuousQueryCommand(m) - case deleteSeriesMessageType: - err = s.applyDeleteSeries(m) + case dropSeriesMessageType: + err = s.applyDropSeries(m) } // Sync high water mark and errors. diff --git a/server_test.go b/server_test.go index 42108137d2d..eceec7e5aa7 100644 --- a/server_test.go +++ b/server_test.go @@ -849,7 +849,7 @@ func TestServer_DropSeries(t *testing.T) { } // Drop the first series - if err := s.DeleteSeries("foo", 1); err != nil { + if err := s.DropSeries("foo", 1); err != nil { t.Fatal(err) } else if s.SeriesExists("foo", 1) { t.Fatalf("series not actually dropped") diff --git a/shard.go b/shard.go index e361c1dc197..23264fcf5c1 100644 --- a/shard.go +++ b/shard.go @@ -45,10 +45,10 @@ func newShardGroup() *ShardGroup { return &ShardGroup{} } // Duration returns the duration between the shard group's start and end time. func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) } -// deleteSeries will delete all data with the seriesID -func (g *ShardGroup) deleteSeries(seriesID uint32) error { +// dropSeries will delete all data with the seriesID +func (g *ShardGroup) dropSeries(seriesID uint32) error { for _, s := range g.Shards { - err := s.deleteSeries(seriesID) + err := s.dropSeries(seriesID) if err != nil { return err } @@ -156,7 +156,7 @@ func (s *Shard) writeSeries(batch []byte) error { }) } -func (s *Shard) deleteSeries(seriesID uint32) error { +func (s *Shard) dropSeries(seriesID uint32) error { return s.store.Update(func(tx *bolt.Tx) error { return tx.DeleteBucket(u32tob(seriesID)) }) From 9a6e29ece8ff9d6c32a72c397f9066cc07a71bca Mon Sep 17 00:00:00 2001 From: David Norton Date: Sat, 21 Feb 2015 18:05:14 -0500 Subject: [PATCH 14/32] collect all series IDs then unlock --- server.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/server.go b/server.go index 893ebf25a41..d4a97410afe 100644 --- a/server.go +++ b/server.go @@ -2122,34 +2122,32 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, return &Result{Err: err} } - s.mu.RUnlock() + var ids seriesIDs for _, m := range measurements { - var ids seriesIDs - + var tmpIDs seriesIDs if stmt.Condition != nil { // Get series IDs that match the WHERE clause. filters := map[uint32]influxql.Expr{} - ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) - - // If no series matched, then go to the next measurement. - if len(ids) == 0 { - continue - } + tmpIDs, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) // TODO: check return of walkWhereForSeriesIds for fields } else { // No WHERE clause so get all series IDs for this measurement. - ids = m.seriesIDs + tmpIDs = m.seriesIDs } - // Delete series by ID. - for _, id := range ids { - err := s.DropSeries(database, id) - if err != nil { - return &Result{Err: err} - } + ids = ids.union(tmpIDs) + } + s.mu.RUnlock() + + // Delete series by ID. + for _, id := range ids { + err := s.DropSeries(database, id) + if err != nil { + return &Result{Err: err} } } + return &Result{} } From 61352f78b6e812cbbc225ff5b932dc792f5d26bc Mon Sep 17 00:00:00 2001 From: David Norton Date: Sat, 21 Feb 2015 18:33:00 -0500 Subject: [PATCH 15/32] pass all series IDs to Server.DropSeries at once --- server.go | 16 ++++------------ server_test.go | 2 +- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/server.go b/server.go index d4a97410afe..0a9ef307023 100644 --- a/server.go +++ b/server.go @@ -1574,8 +1574,8 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { } // DropSeries deletes from an existing series. -func (s *Server) DropSeries(database string, SeriesIDs ...uint32) error { - c := dropSeriesCommand{Database: database, SeriesIDs: SeriesIDs} +func (s *Server) DropSeries(database string, seriesIDs []uint32) error { + c := dropSeriesCommand{Database: database, SeriesIDs: seriesIDs} _, err := s.broadcast(dropSeriesMessageType, c) return err } @@ -2103,7 +2103,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, // Handle the simple `DROP SERIES ` case. if stmt.Source == nil && stmt.Condition == nil { s.mu.RUnlock() - return &Result{Err: s.DropSeries(database, stmt.SeriesID)} + return &Result{Err: s.DropSeries(database, []uint32{stmt.SeriesID})} } // Handle the more complicated `DROP SERIES` with sources and/or conditions... @@ -2140,15 +2140,7 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, } s.mu.RUnlock() - // Delete series by ID. - for _, id := range ids { - err := s.DropSeries(database, id) - if err != nil { - return &Result{Err: err} - } - } - - return &Result{} + return &Result{Err: s.DropSeries(database, []uint32(ids))} } func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result { diff --git a/server_test.go b/server_test.go index eceec7e5aa7..cf69aa384f1 100644 --- a/server_test.go +++ b/server_test.go @@ -849,7 +849,7 @@ func TestServer_DropSeries(t *testing.T) { } // Drop the first series - if err := s.DropSeries("foo", 1); err != nil { + if err := s.DropSeries("foo", []uint32{1}); err != nil { t.Fatal(err) } else if s.SeriesExists("foo", 1) { t.Fatalf("series not actually dropped") From 6114c8138ef60ba3f3fda94602a0cafe6cbf0e6a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 16:37:14 -0700 Subject: [PATCH 16/32] refactoring drop series --- database.go | 26 ++++++++++++++++++-------- server.go | 40 ++++++++++++++++------------------------ 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/database.go b/database.go index 2b08a991707..2232969d2b7 100644 --- a/database.go +++ b/database.go @@ -222,7 +222,7 @@ func (m *Measurement) addSeries(s *Series) bool { } // removeSeries will remove a series from the measurementIndex. Returns true if already removed -func (m *Measurement) removeSeries(seriesID uint32) bool { +func (m *Measurement) dropSeries(seriesID uint32) bool { if _, ok := m.seriesByID[seriesID]; !ok { return true } @@ -1114,15 +1114,25 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool { return idx.addSeries(s) } -// removeSeriesFromIndex removes the series from the index -func (db *database) removeSeriesFromIndex(seriesID uint32) bool { - // if the series is already gone, return - if db.series[seriesID] == nil { - return true +// dropSeries removes the series from the in memory references +func (db *database) dropSeries(seriesIDs ...uint32) error { + for _, id := range seriesIDs { + // if the series is already gone, return + if db.series[id] == nil { + continue + } + + delete(db.series, id) + + // Remove series information from measurements + for _, m := range db.measurements { + if !m.dropSeries(id) { + return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) + } + } } - delete(db.series, seriesID) - return true + return nil } // createMeasurementIfNotExists will either add a measurement object to the index or return the existing one. diff --git a/server.go b/server.go index 0a9ef307023..ff1305df3b8 100644 --- a/server.go +++ b/server.go @@ -1539,37 +1539,29 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { if measurement == nil { return fmt.Errorf("measurement not found for series %d", seriesID) } - err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropSeries(c.Database, measurement.Name, seriesID) }) - if err != nil { - return err - } - } - - // Remove shard data - for _, rp := range s.databases[c.Database].policies { - for _, id := range c.SeriesIDs { - err := rp.dropSeries(id) - if err != nil { + err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { + if err := tx.dropSeries(c.Database, measurement.Name, seriesID); err != nil { return err } - } - } - // Delete the database entry. - for _, id := range c.SeriesIDs { - if !database.removeSeriesFromIndex(id) { - return fmt.Errorf("failed to remove series id %d from index", id) - } - } + // Remove shard data + for _, rp := range s.databases[c.Database].policies { + if err := rp.dropSeries(seriesID); err != nil { + return err + } + } - // Delete measurment references to the series - for _, m := range database.measurements { - for _, id := range c.SeriesIDs { - if !m.removeSeries(id) { - return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) + // Delete the database entry. + if err := database.dropSeries(c.SeriesIDs...); err != nil { + return fmt.Errorf("failed to remove series from index") } + return nil + }) + if err != nil { + return err } } + return nil } From 2752ada587a09950c213ea3f553d9b41e9ca01fd Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 16:45:16 -0700 Subject: [PATCH 17/32] moving more responsibility to database.dropSeries --- database.go | 7 +++++++ server.go | 9 +-------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/database.go b/database.go index 2232969d2b7..54c7b20617d 100644 --- a/database.go +++ b/database.go @@ -1130,6 +1130,13 @@ func (db *database) dropSeries(seriesIDs ...uint32) error { return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) } } + + // Remove shard data + for _, rp := range db.policies { + if err := rp.dropSeries(id); err != nil { + return err + } + } } return nil diff --git a/server.go b/server.go index ff1305df3b8..36580ddcd8b 100644 --- a/server.go +++ b/server.go @@ -1544,14 +1544,7 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { return err } - // Remove shard data - for _, rp := range s.databases[c.Database].policies { - if err := rp.dropSeries(seriesID); err != nil { - return err - } - } - - // Delete the database entry. + // Delete series from the database. if err := database.dropSeries(c.SeriesIDs...); err != nil { return fmt.Errorf("failed to remove series from index") } From 5df9726c3bb7827fcbac6670470b0f4450ad6bf8 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 17:00:00 -0700 Subject: [PATCH 18/32] fix deleting from crazy map --- database.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/database.go b/database.go index 54c7b20617d..9bcc16d3b5d 100644 --- a/database.go +++ b/database.go @@ -241,9 +241,30 @@ func (m *Measurement) dropSeries(seriesID uint32) bool { m.seriesIDs = ids sort.Sort(m.seriesIDs) - // add this series id to the tag index on the measurement - for k, _ := range s.Tags { - delete(m.seriesByTagKeyValue, k) + // remove this series id to the tag index on the measurement + // s.seriesByTagKeyValue is defined as map[string]map[string]seriesIDs + for k, v := range m.seriesByTagKeyValue { + values := v + for kk, vv := range values { + var ids []uint32 + for _, id := range vv { + if id != seriesID { + ids = append(ids, id) + } + } + // Check to see if we have any ids, if not, remove the key + if len(ids) == 0 { + delete(values, kk) + } else { + values[kk] = ids + } + } + // If we have no values, then we delete the key + if len(values) == 0 { + delete(m.series, k) + } else { + m.seriesByTagKeyValue[k] = values + } } return true From 7d74ccac6a0d3a0390bd42ccfb13133b9866a1a2 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 17:02:10 -0700 Subject: [PATCH 19/32] addressing nit --- influxql/parser.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/influxql/parser.go b/influxql/parser.go index 982b447029d..553cf950955 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -861,8 +861,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { stmt := &DropSeriesStatement{} var err error - tok, _, _ := p.scanIgnoreWhitespace() - if tok == FROM { + if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM { // Parse source. if stmt.Source, err = p.parseSource(); err != nil { return nil, err From c575ee9a0489277864af6f1a37d1d93be09e6f0c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 19:13:01 -0700 Subject: [PATCH 20/32] fixing a fat fingering rebase --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 36580ddcd8b..84e1c010408 100644 --- a/server.go +++ b/server.go @@ -75,7 +75,7 @@ const ( createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60) // Continuous Query messages - createContinuousQueryMessageType = messaging.MessageType(0760) + createContinuousQueryMessageType = messaging.MessageType(0x70) // Write series data messages (per-topic) writeRawSeriesMessageType = messaging.MessageType(0x80) From d35e2a92b11062f96eb630fcb202a1a3fa1c6bea Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 19:14:00 -0700 Subject: [PATCH 21/32] locking is now done in process messages --- server.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server.go b/server.go index 84e1c010408..83316304b2e 100644 --- a/server.go +++ b/server.go @@ -1522,8 +1522,6 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { var c dropSeriesCommand mustUnmarshalJSON(m.Data, &c) - s.mu.Lock() - defer s.mu.Unlock() database := s.databases[c.Database] if database == nil { return ErrDatabaseNotFound From da10fa8e8005cc6c759d94d084efa4552bcf947a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 20:06:58 -0700 Subject: [PATCH 22/32] in depth drop series test --- server_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/server_test.go b/server_test.go index cf69aa384f1..7f449fc4e14 100644 --- a/server_test.go +++ b/server_test.go @@ -856,6 +856,87 @@ func TestServer_DropSeries(t *testing.T) { } } +// Ensure Drop Series can: +// write to measurement cpu with tags region=uswest host=serverA +// write to measurement cpu with tags region=uswest host=serverB +// drop one of those series +// ensure that the dropped series is gone +// ensure that we can still query: select value from cpu where region=uswest +func TestServer_DropSeriesMultipleMeasurements(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.SetDefaultRetentionPolicy("foo", "raw") + s.CreateUser("susy", "pass", false) + + // Write series with one point to the database. + tags := map[string]string{"host": "serverA", "region": "uswest"} + index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + tags = map[string]string{"host": "serverB", "region": "uswest"} + index, err = s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:01Z"), Values: map[string]interface{}{"value": float64(33.2)}}}) + if err != nil { + t.Fatal(err) + } else if err = s.Sync(index); err != nil { + t.Fatalf("sync error: %s", err) + } + + results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"],["serverB","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu where host='serverA'`), "foo", nil) + if results.Error() != nil { + t.Fatalf("unexpected error: %s", results.Error()) + } + + results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverB","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverA'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 0 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverB'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + + results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where region='uswest'`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } +} + // Ensure the server can execute a query and return the data correctly. func TestServer_ExecuteQuery(t *testing.T) { s := OpenServer(NewMessagingClient()) From b53d02fc05631edb22d6420c635c0f68a824ae4c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 20:20:16 -0700 Subject: [PATCH 23/32] delete correct map --- database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database.go b/database.go index 9bcc16d3b5d..e22e384cc5e 100644 --- a/database.go +++ b/database.go @@ -261,7 +261,7 @@ func (m *Measurement) dropSeries(seriesID uint32) bool { } // If we have no values, then we delete the key if len(values) == 0 { - delete(m.series, k) + delete(m.seriesByTagKeyValue, k) } else { m.seriesByTagKeyValue[k] = values } From d5ac049812d9cfc71e5663132ddaf34063442995 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 20:20:36 -0700 Subject: [PATCH 24/32] remove bogus method --- server.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/server.go b/server.go index 83316304b2e..84680fd295a 100644 --- a/server.go +++ b/server.go @@ -1485,17 +1485,6 @@ func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement return } -// SeriesExists returns true if a series exists. -func (s *Server) SeriesExists(name string, id uint32) bool { - s.mu.RLock() - defer s.mu.RUnlock() - d := s.databases[name] - if d == nil { - return false - } - return d.series[id] != nil -} - // addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already // present. It will return an error if the field is present in the command, but is of a different type. func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error { From 63b6719e39739bebdc8a0ce8ee31c78cb345cc3f Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 20:21:02 -0700 Subject: [PATCH 25/32] better basic drop series test --- server_test.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/server_test.go b/server_test.go index 7f449fc4e14..cc9a2929c57 100644 --- a/server_test.go +++ b/server_test.go @@ -836,24 +836,44 @@ func TestServer_DropSeries(t *testing.T) { s := OpenServer(c) defer s.Close() s.CreateDatabase("foo") - s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 1 * time.Hour}) + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.SetDefaultRetentionPolicy("foo", "raw") s.CreateUser("susy", "pass", false) // Write series with one point to the database. - tags := map[string]string{"host": "servera.influx.com", "region": "uswest"} - index, err := s.WriteSeries("foo", "mypolicy", []influxdb.Point{{Name: "cpu_load", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) + tags := map[string]string{"host": "serverA", "region": "uswest"} + index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(23.2)}}}) if err != nil { t.Fatal(err) } else if err = s.Sync(index); err != nil { t.Fatalf("sync error: %s", err) } - // Drop the first series - if err := s.DropSeries("foo", []uint32{1}); err != nil { - t.Fatal(err) - } else if s.SeriesExists("foo", 1) { - t.Fatalf("series not actually dropped") + // Ensure series exiss + results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` { + t.Fatalf("unexpected row(0): %s", s) } + + // Drop series + results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu`), "foo", nil) + if results.Error() != nil { + t.Fatalf("unexpected error: %s", results.Error()) + } + + results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + if res := results.Results[0]; res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } else if len(res.Rows) != 1 { + t.Fatalf("unexpected row count: %d", len(res.Rows)) + } else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":[]}]}` { + t.Fatalf("unexpected row(0): %s", s) + } + } // Ensure Drop Series can: From 3e94c14ef28f807515cdd877d0ac322427cc8502 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 21:15:14 -0700 Subject: [PATCH 26/32] no need to resort --- database.go | 1 - 1 file changed, 1 deletion(-) diff --git a/database.go b/database.go index e22e384cc5e..2e42cec8302 100644 --- a/database.go +++ b/database.go @@ -239,7 +239,6 @@ func (m *Measurement) dropSeries(seriesID uint32) bool { } } m.seriesIDs = ids - sort.Sort(m.seriesIDs) // remove this series id to the tag index on the measurement // s.seriesByTagKeyValue is defined as map[string]map[string]seriesIDs From d059b3e2daedf52d795a728972bb5f78139f2747 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 21:20:28 -0700 Subject: [PATCH 27/32] update test name --- server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_test.go b/server_test.go index cc9a2929c57..69f7baa4c65 100644 --- a/server_test.go +++ b/server_test.go @@ -882,7 +882,7 @@ func TestServer_DropSeries(t *testing.T) { // drop one of those series // ensure that the dropped series is gone // ensure that we can still query: select value from cpu where region=uswest -func TestServer_DropSeriesMultipleMeasurements(t *testing.T) { +func TestServer_DropSeriesTagsPreserved(t *testing.T) { c := NewMessagingClient() s := OpenServer(c) defer s.Close() From 113fcea9c8e36162cb6d9bf9ee686952d3aff332 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 21:37:30 -0700 Subject: [PATCH 28/32] no need to cursor --- metastore.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/metastore.go b/metastore.go index d4be41f5b63..baa9e8d9f62 100644 --- a/metastore.go +++ b/metastore.go @@ -238,21 +238,12 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* return s, nil } +// dropSeries removes all seriesIDS for a given database/measurement func (tx *metatx) dropSeries(database, name string, seriesID uint32) error { - measurmentBucket := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) - - c := measurmentBucket.Cursor() - - for k, _ := c.First(); k != nil; k, _ = c.Next() { - id := btou32(k) - if id == seriesID { - err := c.Delete() - if err != nil { - return err - } - } + b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) + if b != nil { + return b.Delete(u32tob(seriesID)) } - return nil } From 29910c3c99182d36d2bf444f9bc231b22da1f367 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 21:48:18 -0700 Subject: [PATCH 29/32] better transaction batching for drop series --- metastore.go | 14 ++++++++++---- server.go | 26 +++++++++++++++----------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/metastore.go b/metastore.go index baa9e8d9f62..ff52c0f36a1 100644 --- a/metastore.go +++ b/metastore.go @@ -239,10 +239,16 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* } // dropSeries removes all seriesIDS for a given database/measurement -func (tx *metatx) dropSeries(database, name string, seriesID uint32) error { - b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(name)) - if b != nil { - return b.Delete(u32tob(seriesID)) +func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint32) error { + for measurement, ids := range seriesByMeasurement { + b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(measurement)) + if b != nil { + for _, id := range ids { + if err := b.Delete(u32tob(id)); err != nil { + return err + } + } + } } return nil } diff --git a/server.go b/server.go index 84680fd295a..43acaa0f671 100644 --- a/server.go +++ b/server.go @@ -1516,6 +1516,8 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { return ErrDatabaseNotFound } + seriesByMeasurement := make(map[string][]uint32) + // Remove from metastore. for _, seriesID := range c.SeriesIDs { series := database.series[seriesID] @@ -1526,20 +1528,22 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { if measurement == nil { return fmt.Errorf("measurement not found for series %d", seriesID) } - err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { - if err := tx.dropSeries(c.Database, measurement.Name, seriesID); err != nil { - return err - } + seriesByMeasurement[measurement.Name] = append(seriesByMeasurement[measurement.Name], seriesID) + } - // Delete series from the database. - if err := database.dropSeries(c.SeriesIDs...); err != nil { - return fmt.Errorf("failed to remove series from index") - } - return nil - }) - if err != nil { + err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { + if err := tx.dropSeries(c.Database, seriesByMeasurement); err != nil { return err } + + // Delete series from the database. + if err := database.dropSeries(c.SeriesIDs...); err != nil { + return fmt.Errorf("failed to remove series from index") + } + return nil + }) + if err != nil { + return err } return nil From 8637a10b6af2013ccc97149fd53435814ea162c8 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 22:18:25 -0700 Subject: [PATCH 30/32] start with seriesByMeasurement --- database.go | 29 +++++++++++++++-------------- server.go | 48 +++++++++++++++++++++--------------------------- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/database.go b/database.go index 2e42cec8302..4774b62538f 100644 --- a/database.go +++ b/database.go @@ -1135,26 +1135,27 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool { } // dropSeries removes the series from the in memory references -func (db *database) dropSeries(seriesIDs ...uint32) error { - for _, id := range seriesIDs { - // if the series is already gone, return - if db.series[id] == nil { - continue - } +func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error { + for measurement, ids := range seriesByMeasurement { + for _, id := range ids { + // if the series is already gone, return + if db.series[id] == nil { + continue + } - delete(db.series, id) + delete(db.series, id) - // Remove series information from measurements - for _, m := range db.measurements { + // Remove series information from measurements + m := db.measurements[measurement] if !m.dropSeries(id) { return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) } - } - // Remove shard data - for _, rp := range db.policies { - if err := rp.dropSeries(id); err != nil { - return err + // Remove shard data + for _, rp := range db.policies { + if err := rp.dropSeries(id); err != nil { + return err + } } } } diff --git a/server.go b/server.go index 43acaa0f671..d8d56291122 100644 --- a/server.go +++ b/server.go @@ -1516,28 +1516,14 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { return ErrDatabaseNotFound } - seriesByMeasurement := make(map[string][]uint32) - // Remove from metastore. - for _, seriesID := range c.SeriesIDs { - series := database.series[seriesID] - if series == nil { - return fmt.Errorf("series not found for id %d", seriesID) - } - measurement := series.measurement - if measurement == nil { - return fmt.Errorf("measurement not found for series %d", seriesID) - } - seriesByMeasurement[measurement.Name] = append(seriesByMeasurement[measurement.Name], seriesID) - } - err := s.meta.mustUpdate(m.Index, func(tx *metatx) error { - if err := tx.dropSeries(c.Database, seriesByMeasurement); err != nil { + if err := tx.dropSeries(c.Database, c.SeriesByMeasurement); err != nil { return err } // Delete series from the database. - if err := database.dropSeries(c.SeriesIDs...); err != nil { + if err := database.dropSeries(c.SeriesByMeasurement); err != nil { return fmt.Errorf("failed to remove series from index") } return nil @@ -1550,15 +1536,15 @@ func (s *Server) applyDropSeries(m *messaging.Message) error { } // DropSeries deletes from an existing series. -func (s *Server) DropSeries(database string, seriesIDs []uint32) error { - c := dropSeriesCommand{Database: database, SeriesIDs: seriesIDs} +func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]uint32) error { + c := dropSeriesCommand{Database: database, SeriesByMeasurement: seriesByMeasurement} _, err := s.broadcast(dropSeriesMessageType, c) return err } type dropSeriesCommand struct { - Database string `json:"datbase"` - SeriesIDs []uint32 `json:"seriesIds"` + Database string `json:"datbase"` + SeriesByMeasurement map[string][]uint32 `json:"seriesIds"` } // Point defines the values that will be written to the database @@ -2076,10 +2062,19 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result { s.mu.RLock() + seriesByMeasurement := make(map[string][]uint32) // Handle the simple `DROP SERIES ` case. if stmt.Source == nil && stmt.Condition == nil { + for _, db := range s.databases { + for _, m := range db.measurements { + if m.seriesByID[stmt.SeriesID] != nil { + seriesByMeasurement[m.Name] = []uint32{stmt.SeriesID} + } + } + } + s.mu.RUnlock() - return &Result{Err: s.DropSeries(database, []uint32{stmt.SeriesID})} + return &Result{Err: s.DropSeries(database, seriesByMeasurement)} } // Handle the more complicated `DROP SERIES` with sources and/or conditions... @@ -2098,25 +2093,24 @@ func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, return &Result{Err: err} } - var ids seriesIDs for _, m := range measurements { - var tmpIDs seriesIDs + var ids seriesIDs if stmt.Condition != nil { // Get series IDs that match the WHERE clause. filters := map[uint32]influxql.Expr{} - tmpIDs, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) + ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters) // TODO: check return of walkWhereForSeriesIds for fields } else { // No WHERE clause so get all series IDs for this measurement. - tmpIDs = m.seriesIDs + ids = m.seriesIDs } - ids = ids.union(tmpIDs) + seriesByMeasurement[m.Name] = ids } s.mu.RUnlock() - return &Result{Err: s.DropSeries(database, []uint32(ids))} + return &Result{Err: s.DropSeries(database, seriesByMeasurement)} } func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result { From c8afd6242a20ca18c9a252c19d412c4aea2e9228 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 22:29:37 -0700 Subject: [PATCH 31/32] deleteing a key that does not exist is ok --- database.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/database.go b/database.go index 4774b62538f..97cfa9a83af 100644 --- a/database.go +++ b/database.go @@ -1146,10 +1146,7 @@ func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error { delete(db.series, id) // Remove series information from measurements - m := db.measurements[measurement] - if !m.dropSeries(id) { - return fmt.Errorf("failed to remove series id %d from measurment %q", id, m.Name) - } + db.measurements[measurement].dropSeries(id) // Remove shard data for _, rp := range db.policies { From 0f42be34bd2c45f3cb6ae4c83d4706cff6470d8d Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Sat, 21 Feb 2015 22:32:07 -0700 Subject: [PATCH 32/32] check that the store exists --- shard.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/shard.go b/shard.go index 23264fcf5c1..857e7a02998 100644 --- a/shard.go +++ b/shard.go @@ -157,6 +157,9 @@ func (s *Shard) writeSeries(batch []byte) error { } func (s *Shard) dropSeries(seriesID uint32) error { + if s.store == nil { + return nil + } return s.store.Update(func(tx *bolt.Tx) error { return tx.DeleteBucket(u32tob(seriesID)) })